From 32f59534cbdb8188e4c8f49d7dfbb4b915eaeb4d Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 15 Jan 2021 16:40:20 +0000 Subject: [PATCH] Stop creating duplicate Dag File Processors (#13662) When a dag file is executed via Dag File Processors and multiple callbacks are created either via zombies or executor events, the dag file is added to the _file_path_queue and the manager will launch a new process to process it, which it should not since the dag file is currently under processing. This will bypass the _parallelism eventually especially when it takes a long time to process some dag files and since self._processors is just a dict with file path as the key. So multiple processors with the same key count as one and hence parallelism is bypassed. This address the same issue as https://github.com/apache/airflow/pull/11875 but instead does not exclude file paths that are recently processed and that run at the limit (which is only used in tests) when Callbacks are sent by the Agent. This is by design as the execution of Callbacks is critical. This is done with a caveat to avoid duplicate processor -- i.e. if a processor exists, the file path is removed from the queue. This means that the processor with the file path to run callback will be still run when the file path is added again in the next loop Tests are added to check the same. closes https://github.com/apache/airflow/issues/13047 closes https://github.com/apache/airflow/pull/11875 --- airflow/utils/dag_processing.py | 11 ++++++++- tests/utils/test_dag_processing.py | 39 ++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index e66803a95f7f2..7e98c111a0d24 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -714,7 +714,12 @@ def _add_callback_to_queue(self, request: CallbackRequest): self._callback_to_execute[request.full_filepath].append(request) # Callback has a higher priority over DAG Run scheduling if request.full_filepath in self._file_path_queue: - self._file_path_queue.remove(request.full_filepath) + # Remove file paths matching request.full_filepath from self._file_path_queue + # Since we are already going to use that filepath to run callback, + # there is no need to have same file path again in the queue + self._file_path_queue = [ + file_path for file_path in self._file_path_queue if file_path != request.full_filepath + ] self._file_path_queue.insert(0, request.full_filepath) def _refresh_dag_dir(self): @@ -988,6 +993,10 @@ def start_new_processes(self): """Start more processors if we have enough slots and files to process""" while self._parallelism - len(self._processors) > 0 and self._file_path_queue: file_path = self._file_path_queue.pop(0) + # Stop creating duplicate processor i.e. processor with the same filepath + if file_path in self._processors.keys(): + continue + callback_to_execute_for_file = self._callback_to_execute[file_path] processor = self._processor_factory( file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index ffcf00e51a401..ad8ef5a5c95b4 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -142,6 +142,45 @@ def test_max_runs_when_no_files(self): child_pipe.close() parent_pipe.close() + @pytest.mark.backend("mysql", "postgres") + def test_start_new_processes_with_same_filepath(self): + """ + Test that when a processor already exist with a filepath, a new processor won't be created + with that filepath. The filepath will just be removed from the list. + """ + processor_factory_mock = MagicMock() + manager = DagFileProcessorManager( + dag_directory='directory', + max_runs=1, + processor_factory=processor_factory_mock, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + file_1 = 'file_1.py' + file_2 = 'file_2.py' + file_3 = 'file_3.py' + manager._file_path_queue = [file_1, file_2, file_3] + + # Mock that only one processor exists. This processor runs with 'file_1' + manager._processors[file_1] = MagicMock() + # Start New Processes + manager.start_new_processes() + + # Because of the config: '[scheduler] parsing_processes = 2' + # verify that only one extra process is created + # and since a processor with 'file_1' already exists, + # even though it is first in '_file_path_queue' + # a new processor is created with 'file_2' and not 'file_1'. + processor_factory_mock.assert_called_once_with('file_2.py', [], [], False) + + assert file_1 in manager._processors.keys() + assert file_2 in manager._processors.keys() + assert [file_3] == manager._file_path_queue + def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager = DagFileProcessorManager( dag_directory='directory',