Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4737] Task set manager properly handles serialization errors #3638

Closed

Conversation

mccheah
Copy link
Contributor

@mccheah mccheah commented Dec 8, 2014

Dealing with [SPARK-4737], the handling of serialization errors should not be the DAGScheduler's responsibility. The task set manager now catches the error and aborts the stage.

If the TaskSetManager throws a TaskNotSerializableException, the TaskSchedulerImpl will return an empty list of task descriptions, because no tasks were started. The scheduler should abort the stage gracefully.

Note that I'm not too familiar with this part of the codebase and its place in the overall architecture of the Spark stack. If implementing it this way will have any averse side effects please voice that loudly.

Our previous attempt at handling un-serializable tasks involved
selectively sampling a task from a task set, and attempting to serialize
it. If the serialization was successful, we assumed that all tasks in
the task set would also be serializable.

Unfortunately, this is not always the case. For example,
ParallelCollectionRDD may have both empty and non-empty partitions, and
the empty partitions would be serializable while the non-empty
partitions actually contain non-serializable objects. This is one of
many examples where sampling task serialization breaks.

When task serialization exceptions occurred in the TaskSchedulerImpl and
TaskSetManager, the result was that the exception was not caught and the
entire scheduler would crash. It would restart, but in a bad state.

There's no reason why the stage should not be aborted if any
serialization error occurs when submitting a task set. If any task in a
task set throws an exception upon serialization, the task set manager
informs the DAGScheduler that the stage failed, aborts the stage. The
TaskSchedulerImpl needs to return a set of task descriptions that were
successfully submitted, but the set will be empty in the case of a
serialization error.
@SparkQA
Copy link

SparkQA commented Dec 8, 2014

Test build #24228 has started for PR 3638 at commit bf5e706.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 8, 2014

Test build #24228 has finished for PR 3638 at commit bf5e706.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class TaskNotSerializableException(error: Throwable) extends Exception(error)

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24228/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 8, 2014

Test build #24229 has started for PR 3638 at commit 5f486f4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 8, 2014

Test build #24229 has finished for PR 3638 at commit 5f486f4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class TaskNotSerializableException(error: Throwable) extends Exception(error)

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24229/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 8, 2014

Test build #24230 has started for PR 3638 at commit 94844d7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 8, 2014

Test build #24230 has finished for PR 3638 at commit 94844d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class TaskNotSerializableException(error: Throwable) extends Exception(error)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24230/
Test PASSed.

@mccheah
Copy link
Contributor Author

mccheah commented Dec 9, 2014

This is ready for further review.

This is the first time I touched code around the scheduler and wrote a change that could have propagated effects to the rest of the job-running stack. Let me know if I made any significant error here.

@mccheah
Copy link
Contributor Author

mccheah commented Dec 12, 2014

Hi, it would be appreciated if someone could give this patch some love. Thanks!

@@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
var conf = new SparkConf(false)

override def beforeAll() {
_sc = new SparkContext("local", "test", conf)
_sc = new SparkContext("local[4]", "test", conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I left out the comment I originally had there...

I wanted to force serialization to occur between threads. Is this not necessary? We explicitly use multiple threads in our unit tests to reproduce issues like this in our own unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine, I guess. My initial concern was just that this was a change that would impact many / all tests.

launchedTask = true
}
} catch {
case e: TaskNotSerializableException => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that there was some earlier discussion of this line. I'm digging into this now, but I think it seems a little dangerous to just silently return an empty result without at least logging a warning message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it looks like we already log the exception inside of resourceOffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think that returning an empty sequence is the right call here, since even though this particular task set might have failed to launch a task, we might still have task sets that can be launched. So, it seems like we'd like to break out of the innermost loop rather than returning from resourceOffers. To do this, it might make sense to split these nested loops into a pair of functions where we can return from the "launch tasks from this particular task set" function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that if this task set fails, even if other task sets can be executed, this task set would prevent being able to complete the whole job. So even if other task sets could be launched, they would be meaningless and wasted since the job as a whole cannot complete. Again however this is new territory to me, so I'll follow this lead and test it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about scenarios where you have multiple concurrent jobs (e.g. in an environment like Databricks Cloud, Spark Jobserver, etc)? I agree that the job associated with this task set is doomed, but other jobs should still be able to make progress and those jobs' task sets might still be schedulable.

@JoshRosen
Copy link
Contributor

Hi @mccheah @mingyukim,

Sorry for the late review. This fix looks good overall. I left a few minor style comments. My main feedback is that I don't think that returning an empty sequence in TaskSchedulerImpl is the right way to handle serialization errors (see my diff comment); instead, I think we should skip over task sets that fail and continue trying to schedule other task sets.

@JoshRosen
Copy link
Contributor

One potential area of concern: can any of the changes here lead to weird re-entrant behavior? I don't think that this will happen, since the DAGScheduler calls end up just queueing messages, but as a note-to-self I may want to just revisit and confirm this before a final sign-off on this PR.

@JoshRosen
Copy link
Contributor

Ping @mccheah @mingyukim Will you have time to work on this PR? I'd like to try to get this in soon to unblock another PR.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 5, 2015

Thanks for the reply, I'll address these comments today.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25064 has started for PR 3638 at commit b2a430d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25064 has finished for PR 3638 at commit b2a430d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25064/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25116/
Test PASSed.

@JoshRosen
Copy link
Contributor

Alright, this looks good to me and I'd like to merge it. I'll revise the commit message to more accurately describe the actual change that's being committed. I'm thinking of something like this (incorporating pieces from the JIRA):

Currently, Spark assumes that serialization cannot fail when tasks are serialized in the TaskSetManager. We assume this because upstream, in the DAGScheduler, we attempt to catch any serialization errors by testing whether the first task / partition can be serialized. However, in some cases this upstream test is not sufficient - i.e. an RDD's first partition might be serializable even though other partitions are not.

This patch solves this problem by catching serialization errors at the time that TaskSetManager attempts to launch tasks. If a task fails with a serialization error, TaskSetManager will now abort task's task set. This prevents uncaught serialization errors from crashing the DAGScheduler.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 7, 2015

Sounds good to me. I'm curious to hear other opinions but if nothing comes up then merging is okay.

val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
def resourceOfferSingleTaskSet(taskSet: TaskSetManager, maxLocality: TaskLocality) : Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space before :

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25281 has started for PR 3638 at commit 5267929.

  • This patch merges cleanly.

availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]])
: Boolean =
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit:

private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    ...
    tasks: Seq[...]): Boolean = {
  ...
}

@andrewor14
Copy link
Contributor

@mccheah @JoshRosen high level question. So what happens now when a task is not serializable? Before it would throw a loud exception and fail the task, but now we catch the task not serializable exception and simply do not schedule it. I may be missing something, but do we ever abort the stage or fail the task?

@andrewor14
Copy link
Contributor

Ah never mind, I found the abort here. This patch LGTM.

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25281 has finished for PR 3638 at commit 5267929.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25281/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25289 has started for PR 3638 at commit 1545984.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25289 has finished for PR 3638 at commit 1545984.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25289/
Test PASSed.

@andrewor14
Copy link
Contributor

Ok I'm merging this into master thanks!

@asfgit asfgit closed this in e0f28e0 Jan 9, 2015
@mccheah mccheah deleted the task-set-manager-properly-handle-ser-err branch February 11, 2015 14:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants