-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop creating duplicate Dag File Processors #13662
Conversation
tests/utils/test_dag_processing.py
Outdated
|
||
assert file_1 in manager._processors.keys() | ||
assert file_2 in manager._processors.keys() | ||
assert [file_3] == manager._file_path_queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check processor_factory is only called once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason. |
When a dag file is executed via Dag File Processors and multiple callbacks are created either via zombies or executor events, the dag file is added to the _file_path_queue and the manager will launch a new process to process it, which it should not since the dag file is currently under processing. This will bypass the _parallelism eventually especially when it takes long time to process some dag files. This address the same issue as apache#11875 but instead does not exlucde filepaths that are recently processed and that run at limit (which is only used in tests) when Callbacks are sent by the Agent. This is by design as execution of Callbacks is critical. This is done with a caveat to avoid duplicate processor -- i.e. if a processor exists, instead of removing the file path from the queue it is removed from the beginning of the queue to the end. This means that the processor with the filepath to run callback is still run before other filepaths are added. Tests are added to check the same. closes apache#13047
b7fd374
to
8a9fb4e
Compare
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
When a dag file is executed via Dag File Processors and multiple callbacks are created either via zombies or executor events, the dag file is added to the _file_path_queue and the manager will launch a new process to process it, which it should not since the dag file is currently under processing. This will bypass the _parallelism eventually especially when it takes a long time to process some dag files and since self._processors is just a dict with file path as the key. So multiple processors with the same key count as one and hence parallelism is bypassed. This address the same issue as apache#11875 but instead does not exclude file paths that are recently processed and that run at the limit (which is only used in tests) when Callbacks are sent by the Agent. This is by design as the execution of Callbacks is critical. This is done with a caveat to avoid duplicate processor -- i.e. if a processor exists, the file path is removed from the queue. This means that the processor with the file path to run callback will be still run when the file path is added again in the next loop Tests are added to check the same. closes apache#13047 closes apache#11875 (cherry picked from commit 32f5953)
When a dag file is executed via Dag File Processors and multiple callbacks are created either via zombies or executor events, the dag file is added to the _file_path_queue and the manager will launch a new process to process it, which it should not since the dag file is currently under processing. This will bypass the _parallelism eventually especially when it takes a long time to process some dag files and since self._processors is just a dict with file path as the key. So multiple processors with the same key count as one and hence parallelism is bypassed. This address the same issue as #11875 but instead does not exclude file paths that are recently processed and that run at the limit (which is only used in tests) when Callbacks are sent by the Agent. This is by design as the execution of Callbacks is critical. This is done with a caveat to avoid duplicate processor -- i.e. if a processor exists, the file path is removed from the queue. This means that the processor with the file path to run callback will be still run when the file path is added again in the next loop Tests are added to check the same. closes #13047 closes #11875 (cherry picked from commit 32f5953)
When a dag file is executed via Dag File Processors and multiple callbacks are
created either via zombies or executor events, the dag file is added to
the _file_path_queue and the manager will launch a new process to
process it, which it should not since the dag file is currently under
processing. This will bypass the _parallelism eventually especially when
it takes a long time to process some dag files and since
self._processors
is just a dict with file path as the key. So multiple processors with the same key
count as one and hence parallelism is bypassed.
This address the same issue as #11875
but instead does not exclude file paths that are recently processed and that
run at the limit (which is only used in tests) when Callbacks are sent by the
Agent. This is by design as the execution of Callbacks is critical. This is done
with a caveat to avoid duplicate processor -- i.e. if a processor exists,
the file path is removed from the queue. This means that the processor with
the file path to run callback will be still run when the filepath is added again in the
next loop
Tests are added to check the same.
closes #13047, #11875
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.