-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX Slow cleared tasks would be adopted by Celery. #16718
Conversation
I was thinking if we can improve the query somehow by checking if the orphaned task was from one of the executors that died. What do you think? |
@@ -172,6 +172,7 @@ def clear_task_instances( | |||
# original max_tries or the last attempted try number. | |||
ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries) | |||
ti.state = State.NONE | |||
ti.external_executor_id = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check this in the unit tests so we don't regress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good one! Will add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the current test to also verify the external_executor_id
is reset.
We could if we need to - but the reason I didn't is double adoption: if a scheduler adopts tasks and then dies we want all those tasks (it's own and the ones it adopted) to be adopted again |
Yes alright. It sounds like we should just make sure that cleared tasks are not being picked up. |
6763996
to
0281ed7
Compare
The test failures seem random. Can someone rerun the failing parts :)? |
@Jorricks We seem to have general problem with kind tests in GitHub public runners #16736, for the other - you can do it yourself:
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done 👏
Have rebased the PR on Main 🤞 |
0281ed7
to
73cd253
Compare
Celery executor is currently adopting anything that has ever run before and has been cleared since then. **Example of the issue:** We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever [without this PR](#16550). However, they should have never been adopted in the first place. **Contents of the PR**: 1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted. 2. Given this task instance `external_executor_id` is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance. (cherry picked from commit 554a239)
Celery executor is currently adopting anything that has ever run before and has been cleared since then. **Example of the issue:** We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever [without this PR](#16550). However, they should have never been adopted in the first place. **Contents of the PR**: 1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted. 2. Given this task instance `external_executor_id` is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance. (cherry picked from commit 554a239)
Celery executor is currently adopting anything that has ever run before and has been cleared since then. **Example of the issue:** We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever [without this PR](#16550). However, they should have never been adopted in the first place. **Contents of the PR**: 1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted. 2. Given this task instance `external_executor_id` is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance. (cherry picked from commit 554a239)
Celery executor is currently adopting anything that has ever run before and has been cleared since then. **Example of the issue:** We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever [without this PR](#16550). However, they should have never been adopted in the first place. **Contents of the PR**: 1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted. 2. Given this task instance `external_executor_id` is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance. (cherry picked from commit 554a239)
Celery executor is currently adopting anything that has ever run before and has been cleared since then.
Example of the issue:
We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever without this PR. However, they should have never been adopted in the first place.
Contents of the PR:
external_executor_id
is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance.