Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19263] DAGScheduler should avoid sending conflicting task set. #16620

Closed
wants to merge 20 commits into from

Conversation

jinxing64
Copy link

@jinxing64 jinxing64 commented Jan 17, 2017

What changes were proposed in this pull request?

In current DAGScheduler handleTaskCompletion code, when event.reason is Success, it will first do stage.pendingPartitions -= task.partitionId, which maybe a bug when FetchFailed happens.

Think about below

  1. Stage 0 runs and generates shuffle output data.
  2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
  3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch;
  4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
  5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A)
  6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
  7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set

In this fix

Add a check if there is already active(not zombie) taskSetManager before resubmission.

How was this patch tested?

Added a unit test in.

@markhamstra
Copy link
Contributor

Thanks for the work thus far, @jinxing64 , but this really needs updated test coverage before we can consider merging it.

@squito

@markhamstra
Copy link
Contributor

ok to test

@markhamstra
Copy link
Contributor

Beyond the lack of new tests, this patch is causing a couple of existing DAGSchedulerSuite tests to fail for me locally: "run trivial shuffle with out-of-band failure and retry" and "map stage submission with executor failure late map task completions"

@squito
Copy link
Contributor

squito commented Jan 17, 2017

Thanks for pointing out this issue, and the nice description. Still looking but sounds like a legitimate issue. I think SchedulerIntegrationSuite should be able to replicate the exact scenario you have outlined for adding a test case. @jinxing64 can you look at adding a test case that way? I can try to help there as well, but will take me a few days to get to it.

@markhamstra
Copy link
Contributor

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #3540 has finished for PR 16620 at commit 9e4aab2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@squito
SchedulerIntegrationSuite is very helpful. I like it very much, I can reproduce this issue in SchedulerIntegrationSuite now.
To fix this issue, it is more complicated than I thought, I'll make some change and create a unit test.

@jinxing64 jinxing64 force-pushed the SPARK-19263 branch 3 times, most recently from d3b6ebb to b20d316 Compare January 20, 2017 07:53
@jinxing64 jinxing64 changed the title [SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion. [SPARK-19263] DAGScheduler should avoid sending conflicting task set. Jan 20, 2017
@jinxing64
Copy link
Author

@markhamstra @squito
Thanks a lot for your helpful comments.
I made a unit test for this fix and changed the patch. Now it can pass all unit tests for me locally.
In this fix: add a check if there is already active(not zombie) taskSetManager before resubmission.

taskScheduler.rootPool.getSortedTaskSetQueue.exists {
tsm => tsm.stageId == stageId && !tsm.isZombie
}
} else false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if...else is unnecessary:

val activeTaskSetManagerExist =
  taskScheduler.rootPool != null &&
  taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm =>
    tsm => tsm.stageId == stageId && !tsm.isZombie
  }

@@ -1193,7 +1193,15 @@ class DAGScheduler(
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
val activeTaskSetManagerExist =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be activeTaskSetManagerExists

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And since it is being used as !activeTaskSetManagerExists, you could reverse the sense, avoid needing the !, and call it something like noActiveTaskSetManager.

@jinxing64
Copy link
Author

@markhamstra
Thanks a lot for your comment, I've already refined, please take another look ~

@squito
Copy link
Contributor

squito commented Jan 23, 2017

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71874 has finished for PR 16620 at commit be8bfe5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jinxing64 for working on this. I'm sorry that at the moment my comments are mostly critical, without providing very constructive advice. I'll keep thinking about this but I thought I'd share my feedback now.

This is a really important fix, and the work you are doing on it is great -- but also tricky enough I want to try to put in a change which improves the clarity of the code and we feel confident in.

@@ -1218,7 +1225,9 @@ class DAGScheduler(
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
if (noActiveTaskSetManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this condition go into the surrounding if (!shuffleStage.isAvailable) ? the logInfo is very confusing in this case otherwise.

case (1, 1) =>
// Wait long enough until Success of task(stageAttempt=1 and partition=0)
// is handled by DAGScheduler.
Thread.sleep(5000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is a nuisance. I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in DAGSchedulerSuite? it seems like this can be entirely contained to the DAGScheduler and doesn't require tricky interactions with other parts of the scheduler. (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of DAGSchedlerSuite but there was some reason you couldn't.)

assert(results === (0 until 2).map { _ -> 10}.toMap)
}

def waitUntilConditionBecomeTrue(condition: => Boolean, timeout: Long, msg: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to waitForCondition (maybe irrevlant given other comments)

}
if (shuffleStage.isAvailable || noActiveTaskSetManager) {
markStageAsFinished(shuffleStage)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit, though this passes all the tests, this is really confusing to me. I only somewhat understand why your original version didn't work, and why this should be used instead. Perhaps some more commenting here would help? The condition under which you do markStageAsFinished seems very broad, so perhaps its worth a comment on the case when you do not (and perhaps even a logInfo in an else branch). The discrepancy between pendingPartitions and availableOutputs is also surprising -- perhaps that is worth extra comments on Stage, on how the meaning of those two are different.

@jinxing64 jinxing64 force-pushed the SPARK-19263 branch 2 times, most recently from 217aa44 to 3f0ebb8 Compare January 25, 2017 08:51
@jinxing64
Copy link
Author

jinxing64 commented Jan 25, 2017

@squito
Thanks a lot for your comments, they are very helpful. I've already refined the code, please take another look : )

In current ShuffleMapStage, pendingPartitions.size() == 0, doesn't mean the stage is available. Because the succeeded task can be bogus and out of date and task's epoch is older than corresponding executor's failed epoch in DAGScheduler.

When handle Success of ShuffleMapTask, what I want to do is to check whether there is some tasks running for same stage, if so, do not resubmit if pendingPartitions.isEmpty && !stage.isAvailable. there are two benefits for this:

  1. Success of the running tasks have chance to update mapstatus to ShuffleMapStage, and turn it to be available;
  2. Avoid submitting conflicting taskSet.

@jinxing64
Copy link
Author

jinxing64 commented Jan 25, 2017

hmm, this is a nuisance. I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in DAGSchedulerSuite? it seems like this can be entirely contained to the DAGScheduler and doesn't require tricky interactions with other parts of the scheduler. (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of DAGSchedlerSuite but there was some reason you couldn't.)

@squito
DAGSchedulerSuite is quite hard for me. Because this bug happens during the interreaction between DAGScheduler and TaksSchedulerImpl, actually the conflicting exception is thrown in TaskSchedulerImpl when submitTasks is called from DAGScheduler. DAGSchedulerSuite only provides a very simple TaskScheduler, of course I can check the conflicting in it but I don't think it is convincing enough.

I don't like the Thread.sleep(5000) either. But I didn't find a better way. I'm sorry to add TestDAGScheduler in SchedulerIntegrationSuite just like TestTaskScheduler for tracking more state. But perhaps it can also be used in the future. If it is not preferred, I'm so sorry.

@SparkQA
Copy link

SparkQA commented Jan 25, 2017

Test build #71979 has finished for PR 16620 at commit 3f0ebb8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

Fail to pass unit test. I will keep working on this.

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72023 has finished for PR 16620 at commit be7e701.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)
}
Copy link
Author

@jinxing64 jinxing64 Jan 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Success is handled in DAGScheduler in a different thread. DAGScheduler perhaps needs to check tasksetManager's status, e.g. isZombie. Move the code here, thus the checking status of TaskSetManager in DAGScheduler when handle Success is safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be moved before maybeFinishTaskSet(), if you only need isZombie=true? for performance its helpful to hand off to the dagscheduler thread as soon as we can. Probably not a huge impact but we should try to avoid impacting performance where possible.

Copy link
Author

@jinxing64 jinxing64 Jan 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito
Yes, it makes sense to move this part before maybeFinishTaskSet(), I will refine.

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72028 has finished for PR 16620 at commit de19333.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@squito
Could you please take another look at this ? : )

@jinxing64
Copy link
Author

@squito
ping for review~~

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great -- I just left some nits on improving test commenting.

@@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}

test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
" even with late completions from earlier stage attempts") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent two more spaces

taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pass null as a named parameter here? ("parameterName = null")

(Success, makeMapStatus("hostB", 2)),
(Success, makeMapStatus("hostB", 2))))

// Task succeeds on a failed executor. The success is bogus.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change the 2nd sentence to "The success should be ignored because the task started before the executor failed, so the output may have been lost."

runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))

assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the second part have taskSets(3) instead of taskSets(2)?


submit(rddC, Array(0, 1))

assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment this like I suggested in your other PR?

runEvent(makeCompletionEvent(
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))

// There should be no new attempt of stage submitted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add "because task 1 is still running in the current attempt (and hasn't completed successfully in any earlier attempts)."

taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))

// There should be no new attempt of stage submitted.
assert(taskSets.size === 4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the line that would fail without your change? (just verifying my understanding)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so : )

runEvent(makeCompletionEvent(
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))

// ResultStage submitted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Now the ResultStage should be submitted, because all of the tasks to generate rddB have completed successfully on alive executors."

@SparkQA
Copy link

SparkQA commented Feb 14, 2017

Test build #72849 has finished for PR 16620 at commit ab8d13e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@kayousterhout
I've refined accordingly, please take another look : )

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the careful analysis and writeup Kay. this version makes sense to me.

// completed successfully from the perspective of the TaskSetManager, mark it as
// no longer pending (the TaskSetManager may consider the task complete even
// when the output needs to be ignored because the task's epoch is too small below).
shuffleStage.pendingPartitions -= task.partitionId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its worth also explaining how this inconsistency between pendingPartitions and outputLocations gets resolved. IIUC, its that when the pendingPartitions is empty, the scheduler will check outputLocations, realize something is missing, and resubmit this stage.

@jinxing64
Copy link
Author

@squito
Thanks a lot. I've refined the comment, please take another look.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72912 has finished for PR 16620 at commit e34cd85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72913 has finished for PR 16620 at commit d225565.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
The pendingPartitions instance variable should be moved to ShuffleMapStage,
because it is only used by ShuffleMapStages. This change is purely refactoring
and does not change functionality.

I fixed this in an attempt to clarify some of the discussion around apache#16620, which I was having trouble reasoning about.  I stole the helpful comment Imran wrote for pendingPartitions and used it here.

cc squito markhamstra jinxing64

Author: Kay Ousterhout <[email protected]>

Closes apache#16876 from kayousterhout/SPARK-19537.
// when the output needs to be ignored because the task's epoch is too small below,
// if so, this can result in inconsistency between pending partitions and output
// locations of stage. When pending partitions is empty, the scheduler will check
// output locations, if there is missing, the stage will be resubmitted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more proposal to improve this comment:

...epoch is too small below. In this case, when pending partitions is empty, there will still be missing output locations, which will cause the DAGScheduler to resubmit the stage below.)

@kayousterhout
Copy link
Contributor

LGTM pending one last comment improvement

@jinxing64
Copy link
Author

Yes, refined : )

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72974 has finished for PR 16620 at commit 6809d1f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

LGTM! Thanks for finding this subtle bug and all of the hard work to fix it @jinxing64. I'll wait until tomorrow to merge this to give Mark and Imran a chance for any last comments.

@jinxing64
Copy link
Author

@kayousterhout @squito @markhamstra
Thanks for all of your work for this patch. Really appreciate your help : )

@markhamstra
Copy link
Contributor

LGTM

@asfgit asfgit closed this in 729ce37 Feb 18, 2017
@kayousterhout
Copy link
Contributor

Thanks all for the work on this! I've merged this into master.

asfgit pushed a commit that referenced this pull request Feb 24, 2017
This commit improves the tests that check the case when a
ShuffleMapTask completes successfully on an executor that has
failed.  This commit improves the commenting around the existing
test for this, and adds some additional checks to make it more
clear what went wrong if the tests fail (the fact that these
tests are hard to understand came up in the context of markhamstra's
proposed fix for #16620).

This commit also removes a test that I realized tested exactly
the same functionality.

markhamstra, I verified that the new version of the test still fails (and
in a more helpful way) for your proposed change for #16620.

Author: Kay Ousterhout <[email protected]>

Closes #16892 from kayousterhout/SPARK-19560.
Yunni pushed a commit to Yunni/spark that referenced this pull request Feb 27, 2017
This commit improves the tests that check the case when a
ShuffleMapTask completes successfully on an executor that has
failed.  This commit improves the commenting around the existing
test for this, and adds some additional checks to make it more
clear what went wrong if the tests fail (the fact that these
tests are hard to understand came up in the context of markhamstra's
proposed fix for apache#16620).

This commit also removes a test that I realized tested exactly
the same functionality.

markhamstra, I verified that the new version of the test still fails (and
in a more helpful way) for your proposed change for apache#16620.

Author: Kay Ousterhout <[email protected]>

Closes apache#16892 from kayousterhout/SPARK-19560.
xuanyuanking pushed a commit to xuanyuanking/spark that referenced this pull request Oct 19, 2017
The pendingPartitions instance variable should be moved to ShuffleMapStage,
because it is only used by ShuffleMapStages. This change is purely refactoring
and does not change functionality.

I fixed this in an attempt to clarify some of the discussion around apache#16620, which I was having trouble reasoning about.  I stole the helpful comment Imran wrote for pendingPartitions and used it here.

cc squito markhamstra jinxing64

Author: Kay Ousterhout <[email protected]>

Closes apache#16876 from kayousterhout/SPARK-19537.
dosoft pushed a commit to WANdisco/spark that referenced this pull request Jun 25, 2018
In current `DAGScheduler handleTaskCompletion` code, when event.reason is `Success`, it will first do `stage.pendingPartitions -= task.partitionId`, which maybe a bug when `FetchFailed` happens.

**Think about below**

1.  Stage 0 runs and generates shuffle output data.
2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch;
4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A)
6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set

**In this fix**

If a task completion is from a previous stage attempt and the epoch is too low
(i.e., it was from a failed executor), don't remove the corresponding partition
from pendingPartitions.

Author: jinxing <[email protected]>
Author: jinxing <[email protected]>

Closes apache#16620 from jinxing64/SPARK-19263.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants