Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1202: Improvements to task killing in the UI. #386

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,12 @@ table.sortable thead {
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0);
}

span.kill-link {
margin-right: 2px;
color: gray;
}

span.kill-link a {
color: gray;
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ private[spark] object JettyUtils extends Logging {
def createRedirectHandler(
srcPath: String,
destPath: String,
beforeRedirect: HttpServletRequest => Unit = x => (),
basePath: String = ""): ServletContextHandler = {
val prefixedDestPath = attachPrefix(basePath, destPath)
val servlet = new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
beforeRedirect(request)
// Make sure we don't end up with "//" in the middle
val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
response.sendRedirect(newUrl)
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
/** Top level user interface for Spark */
private[spark] class SparkUI(
val sc: SparkContext,
conf: SparkConf,
val conf: SparkConf,
val listenerBus: SparkListenerBus,
var appName: String,
val basePath: String = "")
Expand All @@ -46,7 +46,6 @@ private[spark] class SparkUI(
val live = sc != null

val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)

private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
Expand All @@ -70,7 +69,7 @@ private[spark] class SparkUI(
metricsServletHandlers ++
Seq[ServletContextHandler] (
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
createRedirectHandler("/", "/stages", basePath)
createRedirectHandler("/", "/stages", basePath = basePath)
)
}

Expand Down
11 changes: 0 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ private[ui] class IndexPage(parent: JobProgressUI) {
private val sc = parent.sc
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
private val killEnabled = parent.killEnabled

private def appName = parent.appName

Expand All @@ -43,16 +42,6 @@ private[ui] class IndexPage(parent: JobProgressUI) {
val failedStages = listener.failedStages.reverse.toSeq
val now = System.currentTimeMillis()

if (killEnabled) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt

if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
sc.cancelStage(stageId)
}
}


val activeStagesTable =
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
val completedStagesTable =
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
val killEnabled = parent.killEnabled
val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true)

lazy val listener = _listener.get
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
Expand All @@ -51,7 +51,22 @@ private[ui] class JobProgressUI(parent: SparkUI) {

def formatDuration(ms: Long) = Utils.msDurationToString(ms)

private def handleKillRequest(request: HttpServletRequest) = {
if (killEnabled) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
sc.cancelStage(stageId)
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}

def getHandlers = Seq[ServletContextHandler](
createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest),
createServletHandler("/stages/stage",
(request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath),
createServletHandler("/stages/pool",
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,22 @@ private[ui] class StageTable(
}

private def makeDescription(s: StageInfo): Seq[Node] = {
// scalastyle:off
val killLink = if (killEnabled) {
<span class="kill-link">
(<a href={"%s/stages/stage/kill?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>kill</a>)
</span>
}
// scalastyle:on

val nameLink =
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
{s.name}
</a>
val killLink = if (killEnabled) {
<div>[<a href=
{"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
Kill
</a>]</div>

}
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
.getOrElse(<div>{nameLink} {killLink}</div>)
.getOrElse(<div> {killLink}{nameLink}</div>)

return description
}
Expand Down