Skip to content

Commit

Permalink
Ability to kill jobs thru the UI.
Browse files Browse the repository at this point in the history
This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false)
Adding DAGScheduler event StageCancelled and corresponding handlers.
Added cancellation reason to handlers.

Author: Sundeep Narravula <[email protected]>
  • Loading branch information
Sundeep Narravula authored and Sundeep Narravula committed Apr 9, 2014
1 parent b9e0c93 commit 8d97923
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 13 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,16 @@ class SparkContext(
dagScheduler.cancelAllJobs()
}

/** Cancel a given job if it's scheduled or running */
def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
}

/** Cancel a given stage and all jobs associated with it */
def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
Expand Down
32 changes: 27 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class DAGScheduler(
/**
* Cancel a job that is running or waiting in the queue.
*/
def cancelJob(jobId: Int) {
private[spark] def cancelJob(jobId: Int) {
logInfo("Asked to cancel job " + jobId)
eventProcessActor ! JobCancelled(jobId)
}
Expand All @@ -511,6 +511,13 @@ class DAGScheduler(
eventProcessActor ! AllJobsCancelled
}

/**
* Cancel all jobs associated with a running or scheduled stage.
*/
def cancelStage(stageId: Int) {
eventProcessActor ! StageCancelled(stageId)
}

/**
* Process one event retrieved from the event processing actor.
*
Expand Down Expand Up @@ -551,6 +558,9 @@ class DAGScheduler(
submitStage(finalStage)
}

case StageCancelled(stageId) =>
handleStageCancellation(stageId)

case JobCancelled(jobId) =>
handleJobCancellation(jobId)

Expand All @@ -560,11 +570,11 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation)
jobIds.foreach(jobId => handleJobCancellation(jobId, "as part of cancelled job group %s".format(groupId)))

case AllJobsCancelled =>
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

Expand Down Expand Up @@ -991,11 +1001,23 @@ class DAGScheduler(
}
}

private def handleJobCancellation(jobId: Int) {
private def handleStageCancellation(stageId: Int) {
if (stageIdToJobIds.contains(stageId)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray.sorted
jobsThatUseStage.foreach(jobId => {
handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
})
} else {
logInfo("No active jobs to kill for Stage " + stageId)
}
}

private def handleJobCancellation(jobId: Int, reason: String = "") {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
failJobAndIndependentStages(jobIdToActiveJob(jobId),
"Job %d cancelled %s".format(jobId, reason), None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent

private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent

private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent

private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[spark] class SparkUI(
val live = sc != null

val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
val killEnabled = conf.get("spark.ui.killEnabled", "false").toBoolean

private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ private[ui] class IndexPage(parent: JobProgressUI) {
val failedStages = listener.failedStages.reverse.toSeq
val now = System.currentTimeMillis()

val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val activeStagesTable =
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
val completedStagesTable =
new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
val stageIdToPool = HashMap[Int, String]()
val stageIdToDescription = HashMap[Int, String]()
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
val jobIdToStageIds = HashMap[Int, Seq[Int]]()

val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
val killEnabled = parent.killEnabled

lazy val listener = _listener.get
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
Expand All @@ -56,5 +57,5 @@ private[ui] class JobProgressUI(parent: SparkUI) {
(request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
createServletHandler("/stages",
(request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
)
)
}
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,21 @@ private[ui] class StagePage(parent: JobProgressUI) {
private val appName = parent.appName
private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val sc = parent.sc
private val killEnabled = parent.killEnabled

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt

if (killEnabled) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean

if (killFlag && listener.activeStages.contains(stageId)) {
sc.cancelStage(stageId)
}
}

if (!listener.stageIdToTaskData.contains(stageId)) {
val content =
<div>
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, killEnabled: Boolean = false) {
private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
Expand Down Expand Up @@ -71,15 +71,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</div>
}

/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
private def makeDescription(s: StageInfo): Seq[Node] = {
val nameLink =
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
{s.name}
</a>
val killButton = if (killEnabled) {
<form action={"%s/stages/stage/".format(UIUtils.prependBaseUri(basePath))}>
<input type="hidden" value={"true"} name="terminate" />
<input type="hidden" value={"" + s.stageId} name="id" />
<input type="submit" value="Terminate Job"/>
</form>
}
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killButton}</div>)
.getOrElse(<div>{nameLink} {killButton}</div>)

return description
}

/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
Expand Down Expand Up @@ -118,7 +131,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</a>
</td>
}}
<td>{description}</td>
<td>{makeDescription(s)}</td>
<td valign="middle">{submissionTime}</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="progress-cell">
Expand Down

0 comments on commit 8d97923

Please sign in to comment.