diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index e66803a95f7f2..7e98c111a0d24 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -714,7 +714,12 @@ def _add_callback_to_queue(self, request: CallbackRequest): self._callback_to_execute[request.full_filepath].append(request) # Callback has a higher priority over DAG Run scheduling if request.full_filepath in self._file_path_queue: - self._file_path_queue.remove(request.full_filepath) + # Remove file paths matching request.full_filepath from self._file_path_queue + # Since we are already going to use that filepath to run callback, + # there is no need to have same file path again in the queue + self._file_path_queue = [ + file_path for file_path in self._file_path_queue if file_path != request.full_filepath + ] self._file_path_queue.insert(0, request.full_filepath) def _refresh_dag_dir(self): @@ -988,6 +993,10 @@ def start_new_processes(self): """Start more processors if we have enough slots and files to process""" while self._parallelism - len(self._processors) > 0 and self._file_path_queue: file_path = self._file_path_queue.pop(0) + # Stop creating duplicate processor i.e. processor with the same filepath + if file_path in self._processors.keys(): + continue + callback_to_execute_for_file = self._callback_to_execute[file_path] processor = self._processor_factory( file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index ffcf00e51a401..ad8ef5a5c95b4 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -142,6 +142,45 @@ def test_max_runs_when_no_files(self): child_pipe.close() parent_pipe.close() + @pytest.mark.backend("mysql", "postgres") + def test_start_new_processes_with_same_filepath(self): + """ + Test that when a processor already exist with a filepath, a new processor won't be created + with that filepath. The filepath will just be removed from the list. + """ + processor_factory_mock = MagicMock() + manager = DagFileProcessorManager( + dag_directory='directory', + max_runs=1, + processor_factory=processor_factory_mock, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + file_1 = 'file_1.py' + file_2 = 'file_2.py' + file_3 = 'file_3.py' + manager._file_path_queue = [file_1, file_2, file_3] + + # Mock that only one processor exists. This processor runs with 'file_1' + manager._processors[file_1] = MagicMock() + # Start New Processes + manager.start_new_processes() + + # Because of the config: '[scheduler] parsing_processes = 2' + # verify that only one extra process is created + # and since a processor with 'file_1' already exists, + # even though it is first in '_file_path_queue' + # a new processor is created with 'file_2' and not 'file_1'. + processor_factory_mock.assert_called_once_with('file_2.py', [], [], False) + + assert file_1 in manager._processors.keys() + assert file_2 in manager._processors.keys() + assert [file_3] == manager._file_path_queue + def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager = DagFileProcessorManager( dag_directory='directory',