Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Os/commit denied race condition #94

Closed
wants to merge 9 commits into from

Conversation

onursatici
Copy link

To prevent the race condition when the executor gets preempted after being authorized to commit.

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems pretty reasonable to handle the timeouts portion.

I think we need the two-phase though so we're more resilient to the problems I laid out with speculation. One problem of that though is that it's an extra round-trip on every commit.

@@ -49,6 +52,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type TaskAttemptNumber = Int

private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
// TODO: get below from config?
private val MAX_WAIT_FOR_COMMIT = 120000L
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes want from config, and should specify what units this number is in

logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
authorizedCommitters(partition) = CommitState(
attemptNumber, System.currentTimeMillis())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should fit in one line

case CommitState(existingCommitter, startTime)
if System.currentTimeMillis() - startTime > MAX_WAIT_FOR_COMMIT =>
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; maxWaitTime reached for attempId=$existingCommitter")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include maxWaitTime config setting and how the time that's progressed has exceeded that threshold

this should be warn level since a lock expired on a committer -- mention something about prior lock being expired too

typo: attempId -> attemptId

private case object StopCoordinator extends OutputCommitCoordinationMessage
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)

private case class CommitState(attempt: Int, time: Long )
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should have a third entry here for whether the committer returned back that it completed the commit. This lets us distinguish between a repeated request on the same partition that should be authorized (because the prior one timed out) vs shouldn't be authorized (because the prior one completed successfully).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this two-phase commit would defend against issues with speculation (and unintentional speculation due to network partition) where attempt 1 starts, attempt 2 starts, attempt 1 starts to commit, attempt 1 commits, attempt 1 reports that it committed, attempt 2 starts to commit

The bold step is required so that in the "attempt 2 starts to commit" step the OCC is able to deny the commit attempt

s"attemptNumber=$attemptNumber to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter. This can indicate dropped network traffic.")
case CommitState(existingCommitter, startTime)
if System.currentTimeMillis() - startTime > MAX_WAIT_FOR_COMMIT =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to think through what happens when currentTimeMillis() goes backwards, like it does ever few years for leap-seconds. Does this code handle that ok?

true
case existingCommitter =>
case CommitState(existingCommitter, _) =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add another case in here so we can get better logging for commit attempts where another attempt has the lock:

case CommitState(existingCommitter, startTime) =>
    logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
        s"partition=$partition; existingCommitter = $existingCommitter with startTime=$startTime and currentTime=${System.currentTimeMillis()}")
    false

@ash211
Copy link

ash211 commented Jan 14, 2017

I chatted with @onursatici offline and he's planning to address these comments on Monday

Copy link

@robert3005 robert3005 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use nanoTime instead of currentTimeMillis

s"attemptNumber=$attemptNumber to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter. This can indicate dropped network traffic.")
case CommitState(existingCommitter, _, Committed) =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make it warn? info? Seems like useful information if you're analyzing failures

@@ -25,8 +25,17 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv

private sealed trait OutputCommitCoordinationMessage extends Serializable


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary whitespace

s"partition=$partition; it is already committed")
false
case CommitState(existingCommitter, startTime, Committing)
if System.currentTimeMillis() - startTime > MAX_WAIT_FOR_COMMIT =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can lead to unexpected issues. Systemm.currentTimeMillis isn't guaranteed to be consistent across calls and in case of ntp adjustment this can cause unnecessary delays in releasing the lock here. Since this is only ever called from one machine for the lifetime of a job this should be System.nanoTime everywhere to guarantee monotonically increasing results.

@robert3005
Copy link

Spoke offline. Seems everything in spark uses system.currenttimemillis so we can leave as is. Worst case scenario we are holding lock for longer than the setting.

@ash211
Copy link

ash211 commented Jan 18, 2017

Upstream added a (partial) fix for this and some tests at the PR linked from https://issues.apache.org/jira/browse/SPARK-18113. Maybe we can use that as a base for adding tests here?

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @onursatici ! I put a bunch of nitpicky things here but I think this is the direction we want to be going.

Can you also look at the PR linked from https://issues.apache.org/jira/browse/SPARK-18113 and the test that it added? I'm hoping we can use those tests to make sure that this is working properly as well.

// Timeout to release the lock on a task in milliseconds, defaults to 120 seconds
private val MAX_WAIT_FOR_COMMIT = conf.getLong(
"spark.scheduler.outputCommitCoordinator.maxWaitTime", 120000L
) * 1e6.toLong
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please name this in a way where the unit is in the name for both config value and variable? So MAX_WAIT_FOR_COMMIT_NANOS and spark.scheduler.outputCommitCoordinator.maxWaitTimeMillis

* lost), then a subsequent task attempt may be authorized to commit its output.
* If a task attempt has been authorized to commit, then all other attempts to commit
* the same task within spark.scheduler.outputCommitCoordinator.maxWaitTime
* will be denied. If the authorized task attempt fails (e.g. due to its executor being lost),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or preemption

@@ -97,6 +112,31 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}

/**
* Called by tasks to inform their commit is done.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to inform the OutputCommitCoordinator

private case class InformCommitDone(stage: Int, partition: Int, attemptNumber: Int)

object CommitStatus extends Enumeration {
val NotCommitted, Committing, Committed = Value
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to more distinct names, say Uncommitted, MidCommit, Committed

s"partition=$partition; it is already committed")
false
case CommitState(existingCommitter, startTime, Committing)
if System.nanoTime() - startTime > MAX_WAIT_FOR_COMMIT =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you tab this in? I think ./dev/scala-style will flag you on this, and it's unclear that the if goes with the case and not the following statements

logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
authorizedCommitters(partition) = CommitState(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this one line?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't do, exceeds 100 characters

s"partition=$partition; maxWaitTime=$MAX_WAIT_FOR_COMMIT " +
s"reached and prior lock released for attemptId=$existingCommitter")
authorizedCommitters(partition) = CommitState(
attemptNumber, System.nanoTime(), Committing
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this one line?

logDebug(s"Marking attemptNumber=$attemptNumber for stage=$stage, " +
s"partition=$partition as committed")
authorizedCommitters(partition) = CommitState(
attemptNumber, startTime, Committed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this one line?

)
true
case CommitState(committer, startTime, status) =>
logWarning(s"Bad state on attemptNumber=$attemptNumber for stage=$stage, " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break this out to separately handle when a commit attempt happens when the partition has already been committed vs a commit attempt by an unauthorized committer vs when the partition is not yet in MidCommit state?

This has been tricky to debug so if we see it again I want to get really nice and verbose logs

true
case existingCommitter =>
case CommitState(existingCommitter, startTime, _) =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put a reason for why this was denied here?

@ash211
Copy link

ash211 commented Jan 23, 2017

@onursatici is this something you guys can work through this week?

@onursatici onursatici changed the title [WIP] Os/commit denied race condition Os/commit denied race condition Jan 30, 2017
@robert3005
Copy link

@ash211 can we merge this?

@onursatici
Copy link
Author

onursatici commented Feb 15, 2017 via email

@onursatici
Copy link
Author

It turns out that exception we got was irrelevant from this PR, and a fix for that is here: apache#16959

@gregakinman
Copy link

Hey @onursatici is anything blocking this? Would like to see this fixed if possible.

@robert3005
Copy link

robert3005 commented Mar 6, 2017 via email

@robert3005 robert3005 closed this Mar 6, 2017
@robert3005 robert3005 deleted the os/commit-denied-race-condition branch March 6, 2017 16:06
fsamuel-bs pushed a commit to fsamuel-bs/spark that referenced this pull request Oct 8, 2020
…wo plans are the same

### What changes were proposed in this pull request?
This PR combines the current plan and the initial plan in the AQE query plan string when the two plans are the same. It also removes the `== Current Plan ==` and `== Initial Plan ==` headers:

Before
```scala
AdaptiveSparkPlan isFinalPlan=false
+- == Current Plan ==
   SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=palantir#94]
            ...
+- == Initial Plan ==
   SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=palantir#94]
            ...
```
After
```scala
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=palantir#94]
            ...
```
For SQL `EXPLAIN` output:
Before
```scala
AdaptiveSparkPlan (8)
+- == Current Plan ==
   Sort (7)
   +- Exchange (6)
      ...
+- == Initial Plan ==
   Sort (7)
   +- Exchange (6)
      ...
```
After
```scala
AdaptiveSparkPlan (8)
+- Sort (7)
   +- Exchange (6)
      ...
```

### Why are the changes needed?
To simplify the AQE plan string by removing the redundant plan information.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Modified the existing unit test.

Closes apache#29915 from allisonwang-db/aqe-explain.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants