Skip to content

Commit

Permalink
Stop creating duplicate Dag File Processors (#13662)
Browse files Browse the repository at this point in the history
When a dag file is executed via Dag File Processors and multiple callbacks are
created either via zombies or executor events, the dag file is added to
the _file_path_queue and the manager will launch a new process to
process it, which it should not since the dag file is currently under
processing. This will bypass the _parallelism eventually especially when
it takes a long time to process some dag files and since self._processors
is just a dict with file path as the key. So multiple processors with the same key
count as one and hence parallelism is bypassed.

This address the same issue as #11875
but instead does not exclude file paths that are recently processed and that
run at the limit (which is only used in tests) when Callbacks are sent by the
Agent. This is by design as the execution of Callbacks is critical. This is done
with a caveat to avoid duplicate processor -- i.e. if a processor exists,
the file path is removed from the queue. This means that the processor with
the file path to run callback will be still run when the file path is added again in the
next loop

Tests are added to check the same.

closes #13047
closes #11875

(cherry picked from commit 32f5953)
  • Loading branch information
kaxil committed Jan 21, 2021
1 parent b98a61c commit b446d14
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
11 changes: 10 additions & 1 deletion airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,12 @@ def _add_callback_to_queue(self, request: CallbackRequest):
self._callback_to_execute[request.full_filepath].append(request)
# Callback has a higher priority over DAG Run scheduling
if request.full_filepath in self._file_path_queue:
self._file_path_queue.remove(request.full_filepath)
# Remove file paths matching request.full_filepath from self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
self._file_path_queue = [
file_path for file_path in self._file_path_queue if file_path != request.full_filepath
]
self._file_path_queue.insert(0, request.full_filepath)

def _refresh_dag_dir(self):
Expand Down Expand Up @@ -988,6 +993,10 @@ def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
# Stop creating duplicate processor i.e. processor with the same filepath
if file_path in self._processors.keys():
continue

callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._processor_factory(
file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags
Expand Down
39 changes: 39 additions & 0 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,45 @@ def test_max_runs_when_no_files(self):
child_pipe.close()
parent_pipe.close()

@pytest.mark.backend("mysql", "postgres")
def test_start_new_processes_with_same_filepath(self):
"""
Test that when a processor already exist with a filepath, a new processor won't be created
with that filepath. The filepath will just be removed from the list.
"""
processor_factory_mock = MagicMock()
manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
processor_factory=processor_factory_mock,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

file_1 = 'file_1.py'
file_2 = 'file_2.py'
file_3 = 'file_3.py'
manager._file_path_queue = [file_1, file_2, file_3]

# Mock that only one processor exists. This processor runs with 'file_1'
manager._processors[file_1] = MagicMock()
# Start New Processes
manager.start_new_processes()

# Because of the config: '[scheduler] parsing_processes = 2'
# verify that only one extra process is created
# and since a processor with 'file_1' already exists,
# even though it is first in '_file_path_queue'
# a new processor is created with 'file_2' and not 'file_1'.
processor_factory_mock.assert_called_once_with('file_2.py', [], [], False)

assert file_1 in manager._processors.keys()
assert file_2 in manager._processors.keys()
assert [file_3] == manager._file_path_queue

def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
manager = DagFileProcessorManager(
dag_directory='directory',
Expand Down

0 comments on commit b446d14

Please sign in to comment.