-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1104: kill Process in workerThread of ExecutorRunner #35
Changes from 3 commits
1d511c8
0accf2f
eb615ba
3107aeb
85767da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,30 +58,29 @@ private[spark] class ExecutorRunner( | |
override def run() { fetchAndRunExecutor() } | ||
} | ||
workerThread.start() | ||
|
||
// Shutdown hook that kills actors on shutdown. | ||
shutdownHook = new Thread() { | ||
override def run() { | ||
if (process != null) { | ||
logInfo("Shutdown hook killing child process.") | ||
process.destroy() | ||
process.waitFor() | ||
} | ||
killProcess() | ||
} | ||
} | ||
Runtime.getRuntime.addShutdownHook(shutdownHook) | ||
} | ||
|
||
private def killProcess() { | ||
if (process != null) { | ||
logInfo("Killing process!") | ||
process.destroy() | ||
process.waitFor() | ||
process = null | ||
} | ||
} | ||
|
||
/** Stop this executor runner, including killing the process it launched */ | ||
def kill() { | ||
if (workerThread != null) { | ||
workerThread.interrupt() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a comment that the worker thread will the child process when interrupted. |
||
workerThread = null | ||
if (process != null) { | ||
logInfo("Killing process!") | ||
process.destroy() | ||
process.waitFor() | ||
} | ||
state = ExecutorState.KILLED | ||
worker ! ExecutorStateChanged(appId, execId, state, None, None) | ||
Runtime.getRuntime.removeShutdownHook(shutdownHook) | ||
|
@@ -126,7 +125,6 @@ private[spark] class ExecutorRunner( | |
// parent process for the executor command | ||
env.put("SPARK_LAUNCH_WITH_SCALA", "0") | ||
process = builder.start() | ||
|
||
val header = "Spark Executor Command: %s\n%s\n\n".format( | ||
command.mkString("\"", "\" \"", "\""), "=" * 40) | ||
|
||
|
@@ -142,18 +140,18 @@ private[spark] class ExecutorRunner( | |
// long-lived processes only. However, in the future, we might restart the executor a few | ||
// times on the same machine. | ||
val exitCode = process.waitFor() | ||
killProcess() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The contract of waitFor says the process should be terminated by the end -- is there a need to killProcess() it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aarondav en , I think we don't need it now, I originally just wanted to set process to null..... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can it be removed now that we don't set process to null there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ....I think I forgot to do that after I commented...... |
||
state = ExecutorState.FAILED | ||
val message = "Command exited with code " + exitCode | ||
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)) | ||
} catch { | ||
case interrupted: InterruptedException => | ||
case interrupted: InterruptedException => { | ||
logInfo("Runner thread for executor " + fullId + " interrupted") | ||
|
||
killProcess() | ||
} | ||
case e: Exception => { | ||
logError("Error running executor", e) | ||
if (process != null) { | ||
process.destroy() | ||
} | ||
killProcess() | ||
state = ExecutorState.FAILED | ||
val message = e.getClass + ": " + e.getMessage | ||
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the process to null may actually produce a NullPointerException if we're unlucky with the race condition. Let's just go with the flow and not set process to null -- both destroy and waitFor are resilient to processes that are already dead.