Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop creating duplicate Dag File Processors #13662

Merged
merged 2 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,12 @@ def _add_callback_to_queue(self, request: CallbackRequest):
self._callback_to_execute[request.full_filepath].append(request)
# Callback has a higher priority over DAG Run scheduling
if request.full_filepath in self._file_path_queue:
self._file_path_queue.remove(request.full_filepath)
# Remove file paths matching request.full_filepath from self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
self._file_path_queue = [
file_path for file_path in self._file_path_queue if file_path != request.full_filepath
]
self._file_path_queue.insert(0, request.full_filepath)

def _refresh_dag_dir(self):
Expand Down Expand Up @@ -988,6 +993,10 @@ def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
# Stop creating duplicate processor i.e. processor with the same filepath
if file_path in self._processors.keys():
continue

callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._processor_factory(
file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags
Expand Down
39 changes: 39 additions & 0 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,45 @@ def test_max_runs_when_no_files(self):
child_pipe.close()
parent_pipe.close()

@pytest.mark.backend("mysql", "postgres")
def test_start_new_processes_with_same_filepath(self):
"""
Test that when a processor already exist with a filepath, a new processor won't be created
with that filepath. The filepath will just be removed from the list.
"""
processor_factory_mock = MagicMock()
manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
processor_factory=processor_factory_mock,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

file_1 = 'file_1.py'
file_2 = 'file_2.py'
file_3 = 'file_3.py'
manager._file_path_queue = [file_1, file_2, file_3]

# Mock that only one processor exists. This processor runs with 'file_1'
manager._processors[file_1] = MagicMock()
# Start New Processes
manager.start_new_processes()

# Because of the config: '[scheduler] parsing_processes = 2'
# verify that only one extra process is created
# and since a processor with 'file_1' already exists,
# even though it is first in '_file_path_queue'
# a new processor is created with 'file_2' and not 'file_1'.
processor_factory_mock.assert_called_once_with('file_2.py', [], [], False)

assert file_1 in manager._processors.keys()
assert file_2 in manager._processors.keys()
assert [file_3] == manager._file_path_queue

def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
manager = DagFileProcessorManager(
dag_directory='directory',
Expand Down