diff --git a/core/src/main/scala/org/apache/spark/deploy/XSparkUI.scala b/core/src/main/scala/org/apache/spark/deploy/XSparkUI.scala new file mode 100644 index 0000000000000..7b952d40586ed --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/XSparkUI.scala @@ -0,0 +1,49 @@ +package org.apache.spark.deploy + +import org.apache.commons.lang3.StringUtils +import org.apache.spark.internal.Logging + +/** + * Created by Qiniu on 31/7/2017. + */ +object XSparkUI extends Logging{ + + private final val ENV_XSPARK_AGENT = "XSPARK_AGENT" + + final val WORKER_DOMAIN = "worker_domain" + final val DRIVER_DOMAIN = "driver_domain" + final val CLUSTER_DOMAIN = "spark_cluster_domain" + final val POUND = "#" + + def retrieveXSparkAP(domainType: String): String = synchronized { + sys.env.foreach { env => + logDebug(env._1 + ":" + env._2) + } + try { + val xsparkAgentHost = sys.env.get(ENV_XSPARK_AGENT).getOrElse(POUND) + val domain_api = s"http://${xsparkAgentHost}/api/domain_mapping" + logDebug(s"Query domain mapping from ${domain_api} ...") + val result = scala.io.Source.fromURL(domain_api).mkString + val domain_mapping = scala.util.parsing.json.JSON.parseFull(result) + val apDomain = domain_mapping match { + case Some(m: Map[String, Any]) => m(domainType) match { + case domain: String => + logInfo(s"Get domain ${domainType} is ${domain}") + if(domainType == WORKER_DOMAIN) { + return domain.replace("http://", "") + } + return domain + case None => POUND + } + } + apDomain.toString() + } + catch { + case e: Exception => + logError("Failed to retrieve xspark ap domain", e) + // We would rather return POUND(#) than throw exceptions to ui + POUND + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 94ff81c1a68ea..f0bfec9e5aa78 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.ExecutorState +import org.apache.spark.deploy.{ExecutorState, XSparkUI} import org.apache.spark.deploy.master.ExecutorDesc import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -35,6 +35,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") def render(request: HttpServletRequest): Seq[Node] = { // stripXSS is called first to remove suspicious characters used in XSS attacks val appId = UIUtils.stripXSS(request.getParameter("appId")) + val driverDomain = XSparkUI.retrieveXSparkAP(XSparkUI.DRIVER_DOMAIN) val state = master.askSync[MasterStateResponse](RequestMasterState) val app = state.activeApps.find(_.id == appId) .getOrElse(state.completedApps.find(_.id == appId).orNull) @@ -89,8 +90,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { if (!app.isFinished) {
  • - Application Detail UI + Application Detail UI
  • } } @@ -114,21 +114,23 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } private def executorRow(executor: ExecutorDesc): Seq[Node] = { + val workerDomain = XSparkUI.retrieveXSparkAP(XSparkUI.WORKER_DOMAIN) + val workerUrl = "%s%s.%s".format("http://", executor.worker.host, workerDomain) val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy, executor.worker.id, executor.worker.webUiAddress) {executor.id} - {executor.worker.id} + {executor.worker.id} {executor.cores} {executor.memory} {executor.state} stdout + .format(workerUrl, executor.application.id, executor.id)}>stdout stderr + .format(workerUrl, executor.application.id, executor.id)}>stderr } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index ce71300e9097d..a94f74e7a65bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -24,7 +24,7 @@ import scala.xml.Node import org.json4s.JValue import org.apache.spark.deploy.DeployMessages.{KillDriverResponse, MasterStateResponse, RequestKillDriver, RequestMasterState} -import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.{JsonProtocol, XSparkUI} import org.apache.spark.deploy.master._ import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -176,11 +176,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { } private def workerRow(worker: WorkerInfo): Seq[Node] = { + val workerDomain = XSparkUI.retrieveXSparkAP(XSparkUI.WORKER_DOMAIN) + val workerNewDomain = "http://%s.%s".format(worker.host, workerDomain) { if (worker.isAlive()) { - + {worker.id} } else { @@ -220,8 +222,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { if (app.isFinished) { app.desc.name } else { - {app.desc.name} + {app.desc.name} } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 1ad973122b609..2c8b880055208 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -24,7 +24,7 @@ import scala.xml.Node import org.json4s.JValue import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} -import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.{JsonProtocol, XSparkUI} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} import org.apache.spark.ui.{UIUtils, WebUIPage} @@ -40,6 +40,7 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState) + val masterDomain = XSparkUI.retrieveXSparkAP(XSparkUI.CLUSTER_DOMAIN) val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs") val runningExecutors = workerState.executors @@ -70,7 +71,7 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
  • Memory: {Utils.megabytesToString(workerState.memory)} ({Utils.megabytesToString(workerState.memoryUsed)} Used)
  • -

    Back to Master

    +

    Back to Master