-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks #16959
[SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks #16959
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
private type StageId = Int | ||
private type PartitionId = Int | ||
private type TaskAttemptNumber = Int | ||
private case class StageState(authorizedCommitters: Array[TaskAttemptNumber]) { | ||
val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() | ||
} | ||
|
||
private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 | ||
|
||
/** | ||
* Map from active stages's id => partition id => task attempt with exclusive lock on committing | ||
* output for that partition. | ||
* Map from active stages's id => authorized task attempts for each partition id, which hold an | ||
* exclusive lock on committing task output for that partition, as well as any known failed | ||
* attempts in the stage. | ||
* | ||
* Entries are added to the top-level map when stages start and are removed they finish | ||
* (either successfully or unsuccessfully). | ||
* | ||
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. | ||
*/ | ||
private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]() | ||
private val stageStates = mutable.Map[StageId, StageState]() | ||
|
||
/** | ||
* Returns whether the OutputCommitCoordinator's internal data structures are all empty. | ||
*/ | ||
def isEmpty: Boolean = { | ||
authorizedCommittersByStage.isEmpty | ||
stageStates.isEmpty | ||
} | ||
|
||
/** | ||
|
@@ -111,13 +115,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) | ||
java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) | ||
synchronized { | ||
authorizedCommittersByStage(stage) = arr | ||
stageStates(stage) = new StageState(arr) | ||
} | ||
} | ||
|
||
// Called by DAGScheduler | ||
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { | ||
authorizedCommittersByStage.remove(stage) | ||
stageStates.remove(stage) | ||
} | ||
|
||
// Called by DAGScheduler | ||
|
@@ -126,7 +130,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
partition: PartitionId, | ||
attemptNumber: TaskAttemptNumber, | ||
reason: TaskEndReason): Unit = synchronized { | ||
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, { | ||
val stageState = stageStates.getOrElse(stage, { | ||
logDebug(s"Ignoring task completion for completed stage") | ||
return | ||
}) | ||
|
@@ -137,10 +141,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " + | ||
s"attempt: $attemptNumber") | ||
case otherReason => | ||
if (authorizedCommitters(partition) == attemptNumber) { | ||
// Mark the attempt as failed to blacklist from future commit protocol | ||
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber | ||
if (stageState.authorizedCommitters(partition) == attemptNumber) { | ||
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + | ||
s"partition=$partition) failed; clearing lock") | ||
authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER | ||
stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER | ||
} | ||
} | ||
} | ||
|
@@ -149,7 +155,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
if (isDriver) { | ||
coordinatorRef.foreach(_ send StopCoordinator) | ||
coordinatorRef = None | ||
authorizedCommittersByStage.clear() | ||
stageStates.clear() | ||
} | ||
} | ||
|
||
|
@@ -158,13 +164,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
stage: StageId, | ||
partition: PartitionId, | ||
attemptNumber: TaskAttemptNumber): Boolean = synchronized { | ||
authorizedCommittersByStage.get(stage) match { | ||
case Some(authorizedCommitters) => | ||
authorizedCommitters(partition) match { | ||
stageStates.get(stage) match { | ||
case Some(state) if attemptFailed(stage, partition, attemptNumber) => | ||
logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + | ||
s" partition=$partition as task attempt $attemptNumber has already failed.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you fix the indentation here? (should be +2 spaces) |
||
false | ||
case Some(state) => | ||
state.authorizedCommitters(partition) match { | ||
case NO_AUTHORIZED_COMMITTER => | ||
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " + | ||
s"partition=$partition") | ||
authorizedCommitters(partition) = attemptNumber | ||
state.authorizedCommitters(partition) = attemptNumber | ||
true | ||
case existingCommitter => | ||
// Coordinator should be idempotent when receiving AskPermissionToCommit. | ||
|
@@ -181,11 +191,20 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
} | ||
} | ||
case None => | ||
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" + | ||
s"partition $partition to commit") | ||
logDebug(s"Stage $stage has completed, so not allowing" + | ||
s" attempt number $attemptNumber of partition $partition to commit") | ||
false | ||
} | ||
} | ||
|
||
private def attemptFailed( | ||
stage: StageId, | ||
partition: PartitionId, | ||
attempt: TaskAttemptNumber): Boolean = synchronized { | ||
stageStates.get(stage).exists { state => | ||
state.failures.get(partition).exists(_.contains(attempt)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: the one place this is called, you've already looked up the state. so you could take the |
||
} | ||
} | ||
} | ||
|
||
private[spark] object OutputCommitCoordinator { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -195,6 +195,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { | |
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _, | ||
0 until rdd.partitions.size) | ||
} | ||
|
||
test("SPARK-19631: Do not allow failed attempts to be authorized for committing") { | ||
val stage: Int = 1 | ||
val partition: Int = 1 | ||
val failedAttempt: Int = 0 | ||
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) | ||
outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt, | ||
reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) | ||
assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just realized there is still a potential race here -- what if But what if the earlier attempt had committed? I think its OK for it to let another task commit its output over the original output. (If it didn't, then we'd be back to the original scenario, with all future tasks failing b/c they couldn't commit their output.) If that reasoning sounds correct, I think there should also be a test case where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think there's a really tricky race here that I don't know if Spark is even able to fix. Basically:
Depending on how those output files are generated, you may either end up with corrupt output (i.e. output files from both executors) or task failures with partial output (E2 would fail to commit, you'd have some output from E1 in the final directory, task would be retried). I believe that both tasks in this case would have the same set of output files, so one would overwrite the other; so the real problem is if E1 "wins" the race but leaves incomplete output around. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW this is very unlikely to happen with filesystems that have atomic moves (or close enough to that), such as HDFS. If all attempts have the same set of outputs, you'd end up with the correct output eventually. It might be trickier to reason about with filesystems that don't have that feature (hello S3), depending on the actual semantics of how (and if) tasks fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah - this should be fine and is how it currently works as well. If a failure comes in after the authorization, then the task may or may not commit before failure. The coordinator will release the authorization once it gets the failure, and the next task attempt will check and possibly delete any existing data left over while attempting its own commit. Happy to add the test though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that seems accurate. In my example above, this change wouldn't change anything, since it's solving a different race (driver would not allow E1 to commit if it thinks it's dead). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry if I am being really dense, but it still seems to me like in this particular scenario, we're taking broken behavior, fixing it in some cases, and making it worse in others. suppose E1 got permission to commit, then lost connectivity to the driver (or missed heartbeats etc), but continued to try to commit. then E2 asks to commit. Before, we might have ended up with an infinite loop, where E1 never finishes committing, and E2 never gets to commit. Similarly, all future attempts don't get to commit, but we don't even fail the task set because taskcommitdenied doesn't count towards failing a taskset, so it just keeps retrying. After this change, E2 gets to commit immediately after E1 loses connectivity to the driver. E1 may or may not commit at any time. If E1 doesn't commit, great. If E1 does commit, then in most scenarios, things will still be fine. But sometimes, the two commits will stomp on each other. so we've narrowed the scenarios with incorrect behavior -- but the behavior has gone from an infinite loop (bad), to jobs appearing to succeed when they have actually written corrupt data (worse, IMO). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @squito This PR does not affect anything after E1 gets permission to commit, the race you describe is definitely possible and has existed before. This change here only makes it such that if E1 fails before asking to commit, then it is blacklisted from being authorized to commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I finally get it. I was thinking this change was doing something different. Sorry it took me a while. That said, I realize there is another issue here. I tried to update the test to confirm that no other task would commit once there was an executor failure, like so: test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
val stage: Int = 1
val partition: Int = 1
val failedAttempt: Int = 0
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt,
reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
// if we get a request to commit after we learn the executor failed, we don't authorize
// the task to commit, so another attempt can commit.
assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1))
// but if we get an executor failure *after* we authorize a task to commit, we never let
// another task commit. Unfortunately, we just don't know what the status is of the first task,
// so we can't safely let any other task proceed.
outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt + 1,
reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 2))
} This test fails at the final assert, because the executor failure does clear the authorized commiter. But this PR doesn't change that at all -- an equivalent check in master would also fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1)) | ||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry now that I see this I realized it probably makes sense to initialize
arr
in the StageState constructor too (so this line would look likenew StageState(maxPartitionId +1)
, and theStageState
constructor just takes innumPartitions
). Would you mind making that change too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep sure thing, just pushed up the change.