Skip to content

Commit

Permalink
[SPARK-11206] Support SQL UI on the history server
Browse files Browse the repository at this point in the history
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.

To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4.  A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`.

Author: Carson Wang <[email protected]>

Closes #9297 from carsonwang/SqlHistoryUI.
  • Loading branch information
carsonwang authored and Marcelo Vanzin committed Nov 25, 2015
1 parent 21e5606 commit cc243a0
Show file tree
Hide file tree
Showing 21 changed files with 327 additions and 135 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,5 @@ INDEX
gen-java.*
.*avpr
org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }

@Override
public void onOtherEvent(SparkListenerEvent event) { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = {
logEvent(event, flushLogger = true)
}

/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import java.util.Properties
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.{Logging, TaskEndReason}
import com.fasterxml.jackson.annotation.JsonTypeInfo

import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.ui.SparkUI

@DeveloperApi
sealed trait SparkListenerEvent
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent

@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
Expand Down Expand Up @@ -130,6 +134,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
*/
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
* rebuild the history UI.
*/
private[spark] trait SparkHistoryListenerFactory {
/**
* Create listeners used to rebuild the history UI.
*/
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
}

/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
Expand Down Expand Up @@ -223,6 +238,11 @@ trait SparkListener {
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }

/**
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}

Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.ui

import java.util.Date
import java.util.{Date, ServiceLoader}

import scala.collection.JavaConverters._

import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
Expand Down Expand Up @@ -154,7 +157,16 @@ private[spark] object SparkUI {
appName: String,
basePath: String,
startTime: Long): SparkUI = {
create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
val sparkUI = create(
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)

val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
val listeners = listenerFactory.createListeners(conf, sparkUI)
listeners.foreach(listenerBus.addListener)
}
sparkUI
}

/**
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ package org.apache.spark.util

import java.util.{Properties, UUID}

import org.apache.spark.scheduler.cluster.ExecutorInfo

import scala.collection.JavaConverters._
import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._

/**
Expand All @@ -54,6 +56,8 @@ private[spark] object JsonProtocol {

private implicit val format = DefaultFormats

private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
Expand Down Expand Up @@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
case _ => parse(mapper.writeValueAsString(event))
}
}

Expand Down Expand Up @@ -511,6 +516,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory
18 changes: 14 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,8 @@ object SQLContext {
*/
@transient private val instantiatedContext = new AtomicReference[SQLContext]()

@transient private val sqlListener = new AtomicReference[SQLListener]()

/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
*
Expand Down Expand Up @@ -1307,6 +1309,10 @@ object SQLContext {
Option(instantiatedContext.get())
}

private[sql] def clearSqlListener(): Unit = {
sqlListener.set(null)
}

/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
Expand Down Expand Up @@ -1355,9 +1361,13 @@ object SQLContext {
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
*/
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
val listener = new SQLListener(sc.conf)
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
listener
if (sqlListener.get() == null) {
val listener = new SQLListener(sc.conf)
if (sqlListener.compareAndSet(null, listener)) {
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
}
}
sqlListener.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
SparkListenerSQLExecutionEnd}
import org.apache.spark.util.Utils

private[sql] object SQLExecution {
Expand All @@ -45,25 +46,14 @@ private[sql] object SQLExecution {
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
sqlContext.listener.onExecutionStart(
executionId,
callSite.shortForm,
callSite.longForm,
queryExecution.toString,
SparkPlanGraph(queryExecution.executedPlan),
System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
// Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
// However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
// SQL event types to SparkListener since it's a public API, we cannot guarantee that.
//
// SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
//
// The worst case is onExecutionEnd may happen before onJobStart when the listener thread
// is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
}
} finally {
sc.setLocalProperty(EXECUTION_ID_KEY, null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
* Stores information about a SQL SparkPlan.
*/
@DeveloperApi
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metrics: Seq[SQLMetricInfo])

private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
val children = plan.children.map(fromSparkPlan)

new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.metric

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Stores information about a SQL Metric.
*/
@DeveloperApi
class SQLMetricInfo(
val name: String,
val accumulatorId: Long,
val metricParam: String)
Loading

0 comments on commit cc243a0

Please sign in to comment.