Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert [SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions #24359

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Our customer has a very complicated job. Sometimes it successes and sometimes it fails with

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4  has failed the maximum allowable number of times: 4.
Most recent failure reason: org.apache.spark.shuffle.FetchFailedException

However, with the patch #23871 , the job hangs forever.

When I investigated it, I found that DAGScheduler and TaskSchedulerImpl define stage completion differently. DAGScheduler thinks a stage is completed if all its partitions are marked as completed (result stage and shuffle stage). TaskSchedulerImpl thinks a stage's task set is completed when all tasks finish (see the code).

Ideally this two definition should be consistent, but #23871 breaks it. In our customer's Spark log, I found that, a stage's task set completes, but the stage never completes. More specifically, DAGScheduler submits a task set for stage 4.1 with 1000 tasks, but the TaskSetManager skips to run the first 100 tasks. Later on, TaskSetManager finishes 900 tasks and marks the task set as completed. However, DAGScheduler doesn't agree with it and hangs forever, waiting for more task completion events of stage 4.1.

With hindsight, I think TaskSchedulerIImpl.stageIdToFinishedPartitions is fragile. We need to pay more effort to make sure this is consistent with DAGScheduler's knowledge. When DAGScheduler marks some partitions from finished to unfinished, TaskSchedulerIImpl.stageIdToFinishedPartitions should be updated as well.

This PR reverts #23871, let's think of a more robust idea later.

How was this patch tested?

N/A

… learn about the finished partitions"

This reverts commit db86ccb.
@cloud-fan cloud-fan changed the title Revert "[SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions Revert [SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions Apr 12, 2019
@cloud-fan
Copy link
Contributor Author

@pgandhi999
Copy link

The revert looks good to me, however, still do not understand why the job hangs. When a task finishes, we instantly mark the partition of that task in the corresponding TaskSet completed. The PR #23871 focused on updating the partition info in other TaskSets for the same stage.

TaskSchedulerImpl thinks a stage's task set is completed when all tasks finish (see the code).

So technically, shouldn't this imply partition in the same TaskSet has already been marked as completed? Just thinking out loud.

@squito
Copy link
Contributor

squito commented Apr 12, 2019

sure, if you've seen a problem with this, we should revert. Can you share any more details about the sequence of events when this goes wrong (perhaps on the jira)? But I do think you're absolutely right, we made a major mistake not considering that the DAGScheduler moves partitions from finished back to unfinished, which we haven't tracked.

@SparkQA
Copy link

SparkQA commented Apr 12, 2019

Test build #104553 has finished for PR 24359 at commit 3483075.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Apr 14, 2019

@squito the event seq is pretty simple:

  1. a task of a stage failed with FetchFailure
  2. TaskSetManager marks the corresponding task set as zombie
  3. TaskSchedulerImp does not clean up the stageIdToFinishedPartitions of that stage
  4. DAGScheduler re-submit the stage, and mark some finished partitions to unfinished.
  5. TaskSchedulerImp creates another task set for this stage, but mistakenly mark some partitions as finished.

I think 3 is the root cause here, but I'm not sure this is the only place that can make TaskSchedulerImp and DAGScheduler inconsistent.

Anyway, I'm reverting first. Let's think of a better fix later.

@cloud-fan cloud-fan closed this in 0bb716b Apr 14, 2019
@cloud-fan
Copy link
Contributor Author

#23871 has been reverted from master/2.4/2.3

@Ngone51
Copy link
Member

Ngone51 commented Apr 15, 2019

Sorry about this and agree to revert it. And I also think step 3 is the root cause.

(Though, I still feel we could fix it by simply clean up stageIdToFinishedPartitions. )

xuanyuanking added a commit to xuanyuanking/spark that referenced this pull request Apr 22, 2019
cloud-fan added a commit that referenced this pull request Apr 29, 2019
…eption many times

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

#21131 firstly implements that a previous successful completed task from zombie `TaskSetManager` could mark the task of the same partition completed in the active `TaskSetManager`. Later #23871 improves the implementation to cover a corner case that, an active `TaskSetManager` hasn't been created when a previous task succeed.

However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed.

This PR proposes a new fix:
1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it
2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:
1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks`
2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

## How was this patch tested?

a new test case.

Closes #24375 from cloud-fan/fix2.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…eption many times

Ref: LIHADOOP-53705

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

However, apache#23871 has a bug and was reverted in apache#24359. With hindsight, apache#23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed.

This PR proposes a new fix:
1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it
2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:
1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks`
2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

a new test case.

Closes apache#24375 from cloud-fan/fix2.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

RB=2113301
BUG=LIHADOOP-53705
G=spark-reviewers
R=chsingh
A=chsingh
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants