Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks #16959

Closed
wants to merge 7 commits into from

Conversation

pwoody
Copy link

@pwoody pwoody commented Feb 16, 2017

What changes were proposed in this pull request?

Previously it was possible for there to be a race between a task failure and committing the output of a task. For example, the driver may mark a task attempt as failed due to an executor heartbeat timeout (possibly due to GC), but the task attempt actually ends up coordinating with the OutputCommitCoordinator once the executor recovers and committing its result. This will lead to any retry attempt failing because the task result has already been committed despite the original attempt failing.

This ensures that any previously failed task attempts cannot enter the commit protocol.

How was this patch tested?

Added a unit test

@pwoody
Copy link
Author

pwoody commented Feb 17, 2017

@JoshRosen @mccheah to review

Copy link
Contributor

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @pwoody -- I can't imagine a case where we want the coordinator to authorize a task to commit that just came back as failed.

@@ -48,25 +48,28 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type StageId = Int
private type PartitionId = Int
private type TaskAttemptNumber = Int
private case class StageState(authorizedCommitters: Array[TaskAttemptNumber],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put a comment here that the index into the authorizedCommitters array is the partitionId ?

@vanzin
Copy link
Contributor

vanzin commented Feb 22, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Feb 22, 2017

Test build #73294 has finished for PR 16959 at commit b0ac2a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor

ash211 commented Feb 24, 2017

@vanzin are you right person to review this?

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand the explanation of the race in the commit message? It's not clear how the race can happen with just what you wrote. The bug adds a little bit of info but it'd be better to have it properly explained here.

@@ -48,25 +48,28 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type StageId = Int
private type PartitionId = Int
private type TaskAttemptNumber = Int
private case class StageState(authorizedCommitters: Array[TaskAttemptNumber],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter indentation is incorrect. See "Indentation" section at http://spark.apache.org/contributing.html

@@ -137,10 +141,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters(partition) == attemptNumber) {
// Mark the attempt as failed to blacklist from future commit protocol
stageState.failures.get(partition) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better:

stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber

false
}
}

private def attemptFailed(stage: StageId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about parameter indentation.

authorizedCommitters(partition) match {
stageStates.get(stage) match {
case Some(state) if attemptFailed(stage, partition, attemptNumber) =>
logWarning(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning seems a little strong here; maybe info.

@pwoody
Copy link
Author

pwoody commented Feb 27, 2017

Thanks for the feedback @vanzin , I've updated the PR and the description

@SparkQA
Copy link

SparkQA commented Feb 27, 2017

Test build #73532 has started for PR 16959 at commit ed3ab09.

@vanzin
Copy link
Contributor

vanzin commented Feb 28, 2017

Looks ok to me, but let me ping some others @squito @kayousterhout

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left one small comment inline, but other than that, this looks good.

@squito This commit makes me worried there are more bugs related to #16620. For example, what if a task was OK'ed to commit, but then DAGScheduler decides to ignore it because of the epoch. The DAGScheduler / TaskSetManager will attempt to re-run the task, but the output commit will never be OK'ed, which will cause the task to fail a bunch of times and the stage to get aborted. Maybe this is OK because it's unlikely a stage will both be a shuffle map stage and also save output to HDFS? Thoughts?

@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type StageId = Int
private type PartitionId = Int
private type TaskAttemptNumber = Int
private case class StageState(
authorizedCommitters: Array[TaskAttemptNumber],
failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not define failures as an member variable (and initialize it there with an empty map), rather than forcing the caller to pass in an empty map?

@@ -111,13 +115,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
synchronized {
authorizedCommittersByStage(stage) = arr
stageStates(stage) = new StageState(arr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry now that I see this I realized it probably makes sense to initialize arr in the StageState constructor too (so this line would look like new StageState(maxPartitionId +1), and the StageState constructor just takes in numPartitions). Would you mind making that change too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sure thing, just pushed up the change.

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming tests pass. Let's wait to see if @squito has any comments here before merging.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73544 has finished for PR 16959 at commit 20f028a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73547 has finished for PR 16959 at commit 1fcbd5d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

partition: PartitionId,
attempt: TaskAttemptNumber): Boolean = synchronized {
stageStates.get(stage).exists { state =>
state.failures.get(partition).exists(_.contains(attempt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: the one place this is called, you've already looked up the state. so you could take the StageState instead of the StageId (or even just inline the function entirely, though I think it may be a bit easier to understand as a helper method).

@squito
Copy link
Contributor

squito commented Feb 28, 2017

@kayousterhout

This commit makes me worried there are more bugs related to #16620. For example, what if a task was OK'ed to commit, but then DAGScheduler decides to ignore it because of the epoch. The DAGScheduler / TaskSetManager will attempt to re-run the task, but the output commit will never be OK'ed, which will cause the task to fail a bunch of times and the stage to get aborted. Maybe this is OK because it's unlikely a stage will both be a shuffle map stage and also save output to HDFS? Thoughts?

yes, I think you are right, both about the bug, and that its pretty unlikely. It looks like SparkHadoopMapRedUtil is a public class, so a user could write to hdfs inside a map stage, but that would be pretty weird.

outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt,
reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized there is still a potential race here -- what if canCommit happens, and then the Executor dies? we don't know if the output has been written or not. This change allows another task to commit its output. That is good if the first task hadn't ever written its output.

But what if the earlier attempt had committed? I think its OK for it to let another task commit its output over the original output. (If it didn't, then we'd be back to the original scenario, with all future tasks failing b/c they couldn't commit their output.)

If that reasoning sounds correct, I think there should also be a test case where canCommit() comes before the task failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what if the earlier attempt had committed?

I think there's a really tricky race here that I don't know if Spark is even able to fix. Basically:

  • E1 starts to commit
  • E1 loses connectivity with the driver, still committing
  • E2 gets permission to commit from driver
  • both E1 and E2 are committing their output files

Depending on how those output files are generated, you may either end up with corrupt output (i.e. output files from both executors) or task failures with partial output (E2 would fail to commit, you'd have some output from E1 in the final directory, task would be retried).

I believe that both tasks in this case would have the same set of output files, so one would overwrite the other; so the real problem is if E1 "wins" the race but leaves incomplete output around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this is very unlikely to happen with filesystems that have atomic moves (or close enough to that), such as HDFS. If all attempts have the same set of outputs, you'd end up with the correct output eventually.

It might be trickier to reason about with filesystems that don't have that feature (hello S3), depending on the actual semantics of how (and if) tasks fail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - this should be fine and is how it currently works as well.

If a failure comes in after the authorization, then the task may or may not commit before failure. The coordinator will release the authorization once it gets the failure, and the next task attempt will check and possibly delete any existing data left over while attempting its own commit.

Happy to add the test though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry that was in response to @squito

@vanzin Task attempt 1 could very well remove task attempt 2s output even after the second attempt has reported success back to the driver and then get forcibly killed during its own commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that seems accurate. In my example above, this change wouldn't change anything, since it's solving a different race (driver would not allow E1 to commit if it thinks it's dead).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry if I am being really dense, but it still seems to me like in this particular scenario, we're taking broken behavior, fixing it in some cases, and making it worse in others.

suppose E1 got permission to commit, then lost connectivity to the driver (or missed heartbeats etc), but continued to try to commit. then E2 asks to commit.

Before, we might have ended up with an infinite loop, where E1 never finishes committing, and E2 never gets to commit. Similarly, all future attempts don't get to commit, but we don't even fail the task set because taskcommitdenied doesn't count towards failing a taskset, so it just keeps retrying.

After this change, E2 gets to commit immediately after E1 loses connectivity to the driver. E1 may or may not commit at any time. If E1 doesn't commit, great. If E1 does commit, then in most scenarios, things will still be fine. But sometimes, the two commits will stomp on each other.

so we've narrowed the scenarios with incorrect behavior -- but the behavior has gone from an infinite loop (bad), to jobs appearing to succeed when they have actually written corrupt data (worse, IMO).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito This PR does not affect anything after E1 gets permission to commit, the race you describe is definitely possible and has existed before. This change here only makes it such that if E1 fails before asking to commit, then it is blacklisted from being authorized to commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I finally get it. I was thinking this change was doing something different. Sorry it took me a while.

That said, I realize there is another issue here. I tried to update the test to confirm that no other task would commit once there was an executor failure, like so:

  test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
    val stage: Int = 1
    val partition: Int = 1
    val failedAttempt: Int = 0
    outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
    outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt,
      reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
    // if we get a request to commit after we learn the executor failed, we don't authorize
    // the task to commit, so another attempt can commit.
    assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
    assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1))
    // but if we get an executor failure *after* we authorize a task to commit, we never let
    // another task commit.  Unfortunately, we just don't know what the status is of the first task,
    // so we can't safely let any other task proceed.
    outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt + 1,
      reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
    assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 2))
  }

This test fails at the final assert, because the executor failure does clear the authorized commiter. But this PR doesn't change that at all -- an equivalent check in master would also fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73613 has finished for PR 16959 at commit 7d12b78.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Mar 1, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73629 has finished for PR 16959 at commit 7d12b78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Mar 1, 2017

lgtm, sorry for the noise

stageStates.get(stage) match {
case Some(state) if attemptFailed(state, partition, attemptNumber) =>
logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," +
s" partition=$partition as task attempt $attemptNumber has already failed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fix the indentation here? (should be +2 spaces)

@SparkQA
Copy link

SparkQA commented Mar 2, 2017

Test build #73774 has finished for PR 16959 at commit 9e3c8fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor

ash211 commented Mar 2, 2017

Any last changes before merging?

@kayousterhout
Copy link
Contributor

kayousterhout commented Mar 2, 2017

LGTM -- I merged this into master. Thanks for fixing this @pwoody!

@asfgit asfgit closed this in 433d9eb Mar 2, 2017
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…for already failed tasks

## What changes were proposed in this pull request?

Previously it was possible for there to be a race between a task failure and committing the output of a task. For example, the driver may mark a task attempt as failed due to an executor heartbeat timeout (possibly due to GC), but the task attempt actually ends up coordinating with the OutputCommitCoordinator once the executor recovers and committing its result. This will lead to any retry attempt failing because the task result has already been committed despite the original attempt failing.

This ensures that any previously failed task attempts cannot enter the commit protocol.

## How was this patch tested?

Added a unit test

Author: Patrick Woody <[email protected]>

Closes apache#16959 from pwoody/pw/recordFailuresForCommitter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants