Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8974] Catch exceptions in allocation schedule task. #7352

Closed
wants to merge 2 commits into from

Conversation

KaiXinXiaoLei
Copy link

I meet a problem. When I submit some tasks, the thread spark-dynamic-executor-allocation should seed the message about "requestTotalExecutors", and the new executor should start. But I meet a problem about this thread, like:

2015-07-14 19:02:17,461 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:57)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:351)
at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1382)
at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:343)
at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:295)
at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:248)

when after some minutes, I find a new ApplicationMaster start, and tasks submitted start to run. The tasks Completed. And after long time (eg, ten minutes), the number of executor does not reduce to zero. I use the default value of "spark.dynamicAllocation.minExecutors".

@@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
override def run(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all the same code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread stops either way. How does this help?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread not throws exception, it does not stop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but, according to the JIRA you are dealing with an exception case. How does this stop the thread from stopping?

@KaiXinXiaoLei
Copy link
Author

There is not the same code:

case t: Throwable =>
    logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}. 

If there is exception in the thread of spark-dynamic-executor-allocation, the error log will be printed. but this thread does not throw exception, so this thread still support executor allocation.

@srowen
Copy link
Member

srowen commented Jul 12, 2015

This does not change the behavior though. Before, the thread would not-stop if there was no exception. This doesn't seem to be what your JIRA is about though; you are talking about the exception case. But here, all exception paths lead to the Runnable completing. Can you make this clearer or close this PR / JIRA?

@KaiXinXiaoLei
Copy link
Author

I mean: If the thread throws exception, it will not work. So when the new ApplicationMaster starts and not tasks are running, the number of executor does not reduce to minimum.

Explain:
1.The old ApplicationMaster is dead or disconnected
2. Submit tasks before the new ApplicationMaster starts,
3. The spark-dynamic-executor-allocation thread will send massage to add executors.
4. Because the ApplicationMaster is dead or disconnected, the executor not starts successfully, So the thread print the error: "Error sending message [message = RequestExecutors(1)] in 1 attempts"
5, After three times, the thread throws exception: "Uncaught exception in thread spark-dynamic-executor-allocation“
6. When the new ApplicationMaster starts, and tasks submitted to run.
7. After the tasks is completed long time. the number of executors does not reduce to minimum . And there in no info about this thread in log.I set the log level is info.

@srowen
Copy link
Member

srowen commented Jul 13, 2015

@KaiXinXiaoLei again, read your code and trace what happens when an exception is thrown from schedule(). The thread run() method either throws an exception, or completes. It stops in either case, like before. So I don't see why you believe this changes the behavior.

@KaiXinXiaoLei KaiXinXiaoLei changed the title [SPARK-8974] The thread of spark-dynamic-executor-allocation not need throw exception [SPARK-8974] Fix a bug in dynamicAllocation. When there is no running tasks, the number of executor is not zero. Jul 14, 2015
@srowen
Copy link
Member

srowen commented Jul 14, 2015

It is not true that 0 tasks means 0 executors, even with dynamic allocation. Consider a minimum > 0, or ramp-down time, or keeping executors because of cached data.

@KaiXinXiaoLei KaiXinXiaoLei changed the title [SPARK-8974] Fix a bug in dynamicAllocation. When there is no running tasks, the number of executor is not zero. [SPARK-8974] Fix a bug in dynamicAllocation. There is a long time without running tasks, the number of executor does not reduce to the value of "spark.dynamicAllocation.minExecutors". Jul 14, 2015
@KaiXinXiaoLei
Copy link
Author

I set the value of "spark.dynamicAllocation.minExecutors" is 0.

Here i gave the log about case:
2015-07-14 19:02:17,461 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts
2015-07-14 19:04:20,464 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 2 attempts
2015-07-14 19:06:23,468 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 3 attempts
2015-07-14 19:06:26,471 | ERROR | [spark-dynamic-executor-allocation] | Uncaught exception in thread spark-dynamic-executor-allocation

@srowen
Copy link
Member

srowen commented Jul 14, 2015

I still don't see why this relates to the problem you describe; see my question above. If we can't make progress discussing the problem, I'd ask that you close the PR.

@vanzin
Copy link
Contributor

vanzin commented Jul 14, 2015

I think the bug / PR title are just confusingly worded, but the fix is correct. It's because of this note in the ScheduledExecutorService::scheduleAtFixedRate documentation:

If any execution of the task encounters an exception, subsequent executions are suppressed.

So exceptions need to be ignored while the allocation manager is running. logUncaughtExceptions just logs them, but lets the exceptions bubble up (and thus stop the task).

Only thing is that I never know which exception to catch in Scala (is it ControlThrowable or NonFatal(e)?).

I'd use a clearer title for the PR, something like: "Catch exceptions in allocation schedule task." You could explain why that needs to be done in the description, instead of trying to use the title for that.

@srowen
Copy link
Member

srowen commented Jul 14, 2015

Oh I get it, I'm wrong then. This is not about Runnable.run() stopping but the (Java) Executor stopping the scheduling of the task. I didn't realize an exception in a task cancelled the future schedule, so, now it makes sense that it does matter how run() finishes, not just that it finishes. I'm sorry about that, I see it now.

Can I make a minor suggestion then: change logError to logWarning since this is a recoverable error, apparently.

@KaiXinXiaoLei KaiXinXiaoLei changed the title [SPARK-8974] Fix a bug in dynamicAllocation. There is a long time without running tasks, the number of executor does not reduce to the value of "spark.dynamicAllocation.minExecutors". [SPARK-8974] Catch exceptions in allocation schedule task. Jul 15, 2015
@KaiXinXiaoLei
Copy link
Author

@vanzin I am sorry my description is not clear. Thanks.
@srowen I have changed logError to logWarning. Thanks.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #1068 has finished for PR 7352 at commit 3603631.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jul 15, 2015
I meet a problem. When I submit some tasks, the thread spark-dynamic-executor-allocation should seed the message about "requestTotalExecutors", and the new executor should start. But I meet a problem about this thread, like:

2015-07-14 19:02:17,461 | WARN  | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:57)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:351)
        at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1382)
        at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:343)
        at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:295)
        at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:248)

when after some minutes, I find a new ApplicationMaster start,  and tasks submitted start to run. The tasks Completed. And after long time (eg, ten minutes), the number of executor  does not reduce to zero.  I use the default value of "spark.dynamicAllocation.minExecutors".

Author: KaiXinXiaoLei <[email protected]>

Closes #7352 from KaiXinXiaoLei/dym and squashes the following commits:

3603631 [KaiXinXiaoLei] change logError to logWarning
efc4f24 [KaiXinXiaoLei] change file

(cherry picked from commit 674eb2a)
Signed-off-by: Sean Owen <[email protected]>
@srowen
Copy link
Member

srowen commented Jul 15, 2015

Merged into master and 1.4

@asfgit asfgit closed this in 674eb2a Jul 15, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants