Skip to content

Commit

Permalink
spark ui 重写
Browse files Browse the repository at this point in the history
  • Loading branch information
sven0726 committed Jul 31, 2017
1 parent 9b0ba3b commit 00cff81
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 12 deletions.
49 changes: 49 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/XSparkUI.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.apache.spark.deploy

import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging

/**
* Created by Qiniu on 31/7/2017.
*/
object XSparkUI extends Logging{

private final val ENV_XSPARK_AGENT = "XSPARK_AGENT"

final val WORKER_DOMAIN = "worker_domain"
final val DRIVER_DOMAIN = "driver_domain"
final val CLUSTER_DOMAIN = "spark_cluster_domain"
final val POUND = "#"

def retrieveXSparkAP(domainType: String): String = synchronized {
sys.env.foreach { env =>
logDebug(env._1 + ":" + env._2)
}
try {
val xsparkAgentHost = sys.env.get(ENV_XSPARK_AGENT).getOrElse(POUND)
val domain_api = s"http://${xsparkAgentHost}/api/domain_mapping"
logDebug(s"Query domain mapping from ${domain_api} ...")
val result = scala.io.Source.fromURL(domain_api).mkString
val domain_mapping = scala.util.parsing.json.JSON.parseFull(result)
val apDomain = domain_mapping match {
case Some(m: Map[String, Any]) => m(domainType) match {
case domain: String =>
logInfo(s"Get domain ${domainType} is ${domain}")
if(domainType == WORKER_DOMAIN) {
return domain.replace("http://", "")
}
return domain
case None => POUND
}
}
apDomain.toString()
}
catch {
case e: Exception =>
logError("Failed to retrieve xspark ap domain", e)
// We would rather return POUND(#) than throw exceptions to ui
POUND
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.{ExecutorState, XSparkUI}
import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
Expand All @@ -35,6 +35,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
def render(request: HttpServletRequest): Seq[Node] = {
// stripXSS is called first to remove suspicious characters used in XSS attacks
val appId = UIUtils.stripXSS(request.getParameter("appId"))
val driverDomain = XSparkUI.retrieveXSparkAP(XSparkUI.DRIVER_DOMAIN)
val state = master.askSync[MasterStateResponse](RequestMasterState)
val app = state.activeApps.find(_.id == appId)
.getOrElse(state.completedApps.find(_.id == appId).orNull)
Expand Down Expand Up @@ -89,8 +90,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
{
if (!app.isFinished) {
<li><strong>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
app.id, app.desc.appUiUrl)}>Application Detail UI</a>
<a href={driverDomain}>Application Detail UI</a>
</strong></li>
}
}
Expand All @@ -114,21 +114,23 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
}

private def executorRow(executor: ExecutorDesc): Seq[Node] = {
val workerDomain = XSparkUI.retrieveXSparkAP(XSparkUI.WORKER_DOMAIN)
val workerUrl = "%s%s.%s".format("http://", executor.worker.host, workerDomain)
val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy,
executor.worker.id, executor.worker.webUiAddress)
<tr>
<td>{executor.id}</td>
<td>
<a href={workerUrlRef}>{executor.worker.id}</a>
<a href={workerUrl}>{executor.worker.id}</a>
</td>
<td>{executor.cores}</td>
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
.format(workerUrlRef, executor.application.id, executor.id)}>stdout</a>
.format(workerUrl, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
.format(workerUrlRef, executor.application.id, executor.id)}>stderr</a>
.format(workerUrl, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.xml.Node
import org.json4s.JValue

import org.apache.spark.deploy.DeployMessages.{KillDriverResponse, MasterStateResponse, RequestKillDriver, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.{JsonProtocol, XSparkUI}
import org.apache.spark.deploy.master._
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -176,11 +176,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
}

private def workerRow(worker: WorkerInfo): Seq[Node] = {
val workerDomain = XSparkUI.retrieveXSparkAP(XSparkUI.WORKER_DOMAIN)
val workerNewDomain = "http://%s.%s".format(worker.host, workerDomain)
<tr>
<td>
{
if (worker.isAlive()) {
<a href={UIUtils.makeHref(parent.master.reverseProxy, worker.id, worker.webUiAddress)}>
<a href={workerNewDomain}>
{worker.id}
</a>
} else {
Expand Down Expand Up @@ -220,8 +222,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
if (app.isFinished) {
app.desc.name
} else {
<a href={UIUtils.makeHref(parent.master.reverseProxy,
app.id, app.desc.appUiUrl)}>{app.desc.name}</a>
<a href={"app?appId=" + app.id}>{app.desc.name}</a>
}
}
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.xml.Node
import org.json4s.JValue

import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.{JsonProtocol, XSparkUI}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.{UIUtils, WebUIPage}
Expand All @@ -40,6 +40,7 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node] = {
val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
val masterDomain = XSparkUI.retrieveXSparkAP(XSparkUI.CLUSTER_DOMAIN)

val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
val runningExecutors = workerState.executors
Expand Down Expand Up @@ -70,7 +71,7 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
<li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
</ul>
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
<p><a href={masterDomain}>Back to Master</a></p>
</div>
</div>
<div class="row-fluid"> <!-- Executors and Drivers -->
Expand Down

0 comments on commit 00cff81

Please sign in to comment.