Skip to content

Commit

Permalink
Catch exception
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown authored and unknown committed Apr 3, 2015
1 parent 6b47ff7 commit 6483a2a
Showing 1 changed file with 50 additions and 41 deletions.
91 changes: 50 additions & 41 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
Expand Down Expand Up @@ -559,50 +560,56 @@ private[spark] class Client(
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState

if (logApplicationReport) {
logInfo(s"Application report for $appId (state: $state)")
val details = Seq[(String, String)](
("client token", getClientToken(report)),
("diagnostics", report.getDiagnostics),
("ApplicationMaster host", report.getHost),
("ApplicationMaster RPC port", report.getRpcPort.toString),
("queue", report.getQueue),
("start time", report.getStartTime.toString),
("final status", report.getFinalApplicationStatus.toString),
("tracking URL", report.getTrackingUrl),
("user", report.getUser)
)

// Use more loggable format if value is null or empty
val formattedDetails = details
.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue" }
.mkString("")

// If DEBUG is enabled, log report details every iteration
// Otherwise, log them every time the application changes state
if (log.isDebugEnabled) {
logDebug(formattedDetails)
} else if (lastState != state) {
logInfo(formattedDetails)
try {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState

if (logApplicationReport) {
logInfo(s"Application report for $appId (state: $state)")
val details = Seq[(String, String)](
("client token", getClientToken(report)),
("diagnostics", report.getDiagnostics),
("ApplicationMaster host", report.getHost),
("ApplicationMaster RPC port", report.getRpcPort.toString),
("queue", report.getQueue),
("start time", report.getStartTime.toString),
("final status", report.getFinalApplicationStatus.toString),
("tracking URL", report.getTrackingUrl),
("user", report.getUser)
)

// Use more loggable format if value is null or empty
val formattedDetails = details
.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue" }
.mkString("")

// If DEBUG is enabled, log report details every iteration
// Otherwise, log them every time the application changes state
if (log.isDebugEnabled) {
logDebug(formattedDetails)
} else if (lastState != state) {
logInfo(formattedDetails)
}
}
}

if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return (state, report.getFinalApplicationStatus)
}
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return (state, report.getFinalApplicationStatus)
}

if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return (state, report.getFinalApplicationStatus)
}
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return (state, report.getFinalApplicationStatus)
}

lastState = state
lastState = state
} catch {
case e: ApplicationNotFoundException =>
logError(s"Application $appId not found.")
return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
}
}

// Never reached, but keeps compiler happy
Expand Down Expand Up @@ -809,7 +816,9 @@ object Client extends Logging {
}
}
addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
populateHadoopClasspath(conf, env)
if (sparkConf.getBoolean("spark.yarn.includeClusterHadoopClasspath", true)) {
populateHadoopClasspath(conf, env)
}
sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
}

Expand Down

0 comments on commit 6483a2a

Please sign in to comment.