-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19263] DAGScheduler should avoid sending conflicting task set. #16620
Changes from 16 commits
0a3bb74
3d6283a
1ee5a50
e652ad7
b1abf35
5e7f00a
cec7cc2
b02d248
6009ce8
9578383
da7d185
5dce18e
ddab65e
4a7cadc
3a5d60d
46ef5a3
ab8d13e
e34cd85
d225565
6809d1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou | |
} | ||
} | ||
|
||
test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + | ||
" even with late completions from earlier stage attempts") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: indent two more spaces |
||
val rddA = new MyRDD(sc, 2, Nil) | ||
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) | ||
val shuffleIdA = shuffleDepA.shuffleId | ||
|
||
val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) | ||
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) | ||
|
||
val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) | ||
|
||
submit(rddC, Array(0, 1)) | ||
|
||
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you comment this like I suggested in your other PR? |
||
complete(taskSets(0), Seq( | ||
(Success, makeMapStatus("hostA", 2)), | ||
(Success, makeMapStatus("hostA", 2)))) | ||
|
||
// Fetch failed on hostA. | ||
runEvent(makeCompletionEvent( | ||
taskSets(1).tasks(0), | ||
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, | ||
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), | ||
null)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you pass null as a named parameter here? ("parameterName = null") |
||
|
||
scheduler.resubmitFailedStages() | ||
|
||
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) | ||
complete(taskSets(2), Seq( | ||
(Success, makeMapStatus("hostB", 2)), | ||
(Success, makeMapStatus("hostB", 2)))) | ||
|
||
// Task succeeds on a failed executor. The success is bogus. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you change the 2nd sentence to "The success should be ignored because the task started before the executor failed, so the output may have been lost." |
||
runEvent(makeCompletionEvent( | ||
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) | ||
|
||
assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the second part have taskSets(3) instead of taskSets(2)? |
||
runEvent(makeCompletionEvent( | ||
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) | ||
|
||
// There should be no new attempt of stage submitted. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add "because task 1 is still running in the current attempt (and hasn't completed successfully in any earlier attempts)." |
||
assert(taskSets.size === 4) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this the line that would fail without your change? (just verifying my understanding) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think so : ) |
||
runEvent(makeCompletionEvent( | ||
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) | ||
|
||
// ResultStage submitted. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Now the ResultStage should be submitted, because all of the tasks to generate rddB have completed successfully on alive executors." |
||
assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) | ||
complete(taskSets(4), Seq( | ||
(Success, 1), | ||
(Success, 1))) | ||
} | ||
|
||
/** | ||
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. | ||
* Note that this checks only the host and not the executor ID. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its worth also explaining how this inconsistency between pendingPartitions and outputLocations gets resolved. IIUC, its that when the pendingPartitions is empty, the scheduler will check outputLocations, realize something is missing, and resubmit this stage.