-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-6175] fixes scheduler queue bug #6732
[AIRFLOW-6175] fixes scheduler queue bug #6732
Conversation
Need a jira for this please @dimberman |
tests/jobs/test_scheduler_job.py
Outdated
executor=executor, | ||
subdir=os.path.join(settings.DAGS_FOLDER, | ||
"no_dags.py")) | ||
mock.patch.object(scheduler, '_change_state_for_tis_without_dagrun').start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to patch this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Otherwise the tasks won't run because they don't have an associated DagRun.
@ashb this bug has a jira https://issues.apache.org/jira/browse/AIRFLOW-6175 |
We have merge conflicts :( - can you fix that please |
96c24aa
to
eb42612
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this! I'm wondering why the executor starts to run task in the scheduled state. Thought any task sent to the executor would be removed from queue_tasks and be moved to running_tasks and thus the state won't be set back to scheduled.
Co-Authored-By: Kaxil Naik <[email protected]>
Co-Authored-By: Kaxil Naik <[email protected]>
tests/jobs/test_scheduler_job.py
Outdated
ti.state = State.SCHEDULED | ||
tis.append(ti) | ||
session.merge(ti) | ||
session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
session.commit() |
create_session
would commit the session when exiting the contextmanager
tests/jobs/test_scheduler_job.py
Outdated
"no_dags.py")) | ||
mock.patch.object(scheduler, '_change_state_for_tis_without_dagrun').start() | ||
scheduler._process_dags(simple_dag_bag) | ||
[ti.refresh_from_db() for ti in tis] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[ti.refresh_from_db() for ti in tis] | |
for ti in tis: | |
ti.refresh_from_db() |
as we don't need to create list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestion, otherwise LGTM
Let me answer my own question... Since we change from SCHEDULED to QUEUED only when the dag is in the simple_dagbag, thus those left over tasks in queued_tasks won't be set back to QUEUED until the next parse but stlll gonna be picked up by the executor, then cause the deps check to fail. Thanks again for fixing my bad 🙏 And to understand the impact completely, the bug was causing scheduler/executor to slow down quite a lot when traffic >> executor capacity, but not completely blocked right? As those SCHEDULED tasks in queued_tasks would be slowly moved out by _execute_task_instances. |
Following tests are failing:
|
Codecov Report
@@ Coverage Diff @@
## master #6732 +/- ##
==========================================
- Coverage 84.45% 84.23% -0.23%
==========================================
Files 672 672
Lines 38081 38089 +8
==========================================
- Hits 32163 32084 -79
- Misses 5918 6005 +87
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one! Let's get it merged. We have some other "global" updates to this code which aim to increase readability (pylint + type annotations) so we should - I think - merge those fairly quickly.
@potiuk @KevinYang21 wanna give a final lookover before merging? |
oops, just found a debug line that I forgot to remove c0ea272#diff-d7e6d11bfaeca087149ad577dd7704daR68. Otherwise LGTM. Will push a small commit to fix it. If we all good I can merge it after the CI passes. |
…6732) (cherry-picked from commit f3bb4c3) Co-Authored-By: Kaxil Naik <[email protected]> Co-Authored-By: Kevin Yang <[email protected]>
…6732) (cherry-picked from commit f3bb4c3) Co-Authored-By: Kaxil Naik <[email protected]> Co-Authored-By: Kevin Yang <[email protected]>
…6732) (cherry-picked from commit f3bb4c3) Co-Authored-By: Kaxil Naik <[email protected]> Co-Authored-By: Kevin Yang <[email protected]>
…pache#6732) There is a bug caused by scheduler_jobs refactor which leads to task failure and scheduler locking. Essentially when a there is an overflow of tasks going into the scheduler, the tasks are set back to scheduled, but are not removed from the executor's queued_tasks queue. This means that the executor will attempt to run tasks that are in the scheduled state, but those tasks will fail dependency checks. Eventually the queue is filled with scheduled tasks, and the scheduler can no longer run. Co-Authored-By: Kaxil Naik <[email protected]>, Kevin Yang <[email protected]>
Original PR: apache/airflow#6732 Change-Id: I1f8d8c277c54a9b1e2546cf1b37502b64f6c0b5f GitOrigin-RevId: 2651459c7994aae7dd3ef4808f00bb39293ea8a3
Original PR: apache/airflow#6732 Change-Id: I1f8d8c277c54a9b1e2546cf1b37502b64f6c0b5f GitOrigin-RevId: 2651459c7994aae7dd3ef4808f00bb39293ea8a3
Make sure you have checked all steps below.
Jira
Description
There is a bug caused by scheduler_jobs refactor which leads to task failure and scheduler locking.
Essentially when a there is an overflow of tasks going into the scheduler, the tasks are set back to scheduled, but are not removed from the executor's queued_tasks queue.
This means that the executor will attempt to run tasks that are in the scheduled state, but those tasks will fail dependency checks. Eventually the queue is filled with scheduled tasks, and the scheduler can no longer run.
Tests
Commits
Documentation