Skip to content

Commit

Permalink
[SPARK-40834][SQL][FOLLOWUP] Take care of legacy query end events
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is a followup of apache#38302 . For events generated by old versions of Spark, which do not have the new `errorMessage` field, we should use the old way to detect query execution status (failed or not).

This PR also adds a UI test for the expected behavior.

### Why are the changes needed?

backward compatibility

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes apache#38747 from cloud-fan/ui.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Nov 23, 2022
1 parent e42d383 commit 79f8c79
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ object SQLExecution {
SparkThrowableHelper.getMessage(e)
}
val event = SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis(), errorMessage)
executionId,
System.currentTimeMillis(),
// Use empty string to indicate no error, as None may mean events generated by old
// versions of Spark.
errorMessage.orElse(Some("")))
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
// can specify the execution name in more places in the future, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,24 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L

sqlStore.executionsList().foreach { e =>
if (e.errorMessage.isDefined) {
failed += e
} else if (e.completionTime.nonEmpty) {
completed += e
} else {
if (e.errorMessage.get.isEmpty) {
completed += e
} else {
failed += e
}
} else if (e.completionTime.isEmpty) {
running += e
} else {
// When `completionTime` is present, it means the query execution is completed and
// `errorMessage` should be present as well. However, events generated by old versions of
// Spark do not have the `errorMessage` field. We have to check the status of this query
// execution's jobs.
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
if (isFailed) {
failed += e
} else {
completed += e
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,13 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var metrics = Seq[SQLPlanMetric]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var errorMessage: Option[String] = None

var jobs = Map[Int, JobExecutionStatus]()
var stages = Set[Int]()
var driverAccumUpdates = Seq[(Long, Long)]()

@volatile var metricsValues: Map[Long, String] = null
var errorMessage: Option[String] = None

// Just in case job end and execution end arrive out of order, keep track of how many
// end events arrived so that the listener can stop tracking the execution.
Expand All @@ -511,10 +511,10 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
metrics,
submissionTime,
completionTime,
errorMessage,
jobs,
stages,
metricsValues,
errorMessage)
metricsValues)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class SQLExecutionUIData(
val metrics: Seq[SQLPlanMetric],
val submissionTime: Long,
val completionTime: Option[Date],
val errorMessage: Option[String],
@JsonDeserialize(keyAs = classOf[Integer])
val jobs: Map[Int, JobExecutionStatus],
@JsonDeserialize(contentAs = classOf[Integer])
Expand All @@ -100,8 +101,7 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String],
val errorMessage: Option[String]) {
val metricValues: Map[Long, String]) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(
executionId: Long,
time: Long,
// For backward compatibility, the `errorMessage` will be None when we parse event logs
// generated by old versions of Spark. It should always be Some in Spark 3.4+ and empty string
// means there is no error during execution.
errorMessage: Option[String] = None)
extends SparkListenerEvent {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.xml.Node
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, SparkListenerJobStart}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
Expand All @@ -35,6 +36,11 @@ import org.apache.spark.util.kvstore.InMemoryStore

class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {

override def sparkConf: SparkConf = {
// Disable async kv store write in the UI, to make tests more stable here.
super.sparkConf.set(org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED, false)
}

import testImplicits._

var kvstore: ElementTrackingStore = _
Expand All @@ -60,6 +66,42 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
assert(!html.contains("1970/01/01"))
}

test("SPARK-40834: prioritize `errorMessage` over job failures") {
val statusStore = createStatusStore
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
when(tab.sqlStore).thenReturn(statusStore)

val request = mock(classOf[HttpServletRequest])
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)

Seq(Some(""), Some("testErrorMsg"), None).foreach { msg =>
val listener = statusStore.listener.get
val page = new AllExecutionsPage(tab)
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
0,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(0)))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))))
listener.onOtherEvent(SparkListenerSQLExecutionEnd(0, System.currentTimeMillis(), msg))
val html = page.render(request).toString().toLowerCase(Locale.ROOT)

assert(html.contains("failed queries") == !msg.contains(""))
}
}

test("sorting should be successful") {
val statusStore = createStatusStore
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
Expand Down

0 comments on commit 79f8c79

Please sign in to comment.