diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index fee515dc07164b..6df8060f3a311b 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -526,20 +526,14 @@ def deactivate_stale_dags( dags_parsed = session.execute(query) for dag in dags_parsed: - # When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder, - # DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact - # on standalone DAG processors. - dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory # The largest valid difference between a DagFileStat's last_finished_time and a DAG's # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a # significant delay in deactivation of stale dags when a large timeout is configured - dag_removed_from_dag_folder_or_file = ( + if ( dag.fileloc in last_parsed and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc] - ) - - if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file: + ): cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id) to_deactivate.add(dag.dag_id) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 9b8437d77d50a0..8112b7222a6973 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -26,7 +26,6 @@ import random import socket import sys -import tempfile import textwrap import threading import time @@ -639,7 +638,7 @@ def test_scan_stale_dags(self): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=str(TEST_DAG_FOLDER), + dag_directory="directory", max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), @@ -713,11 +712,11 @@ def test_scan_stale_dags_standalone_mode(self): """ Ensure only dags from current dag_directory are updated """ - dag_directory = str(TEST_DAG_FOLDER) + dag_directory = "directory" manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=TEST_DAG_FOLDER, + dag_directory=dag_directory, max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), @@ -741,7 +740,7 @@ def test_scan_stale_dags_standalone_mode(self): # Add stale DAG to the DB other_dag = other_dagbag.get_dag("test_start_date_scheduling") other_dag.last_parsed_time = timezone.utcnow() - other_dag.sync_to_db(processor_subdir="/other") + other_dag.sync_to_db(processor_subdir="other") # Add DAG to the file_parsing_stats stat = DagFileStat( @@ -763,69 +762,6 @@ def test_scan_stale_dags_standalone_mode(self): active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() assert active_dag_count == 1 - @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode - def test_scan_stale_dags_when_dag_folder_change(self): - """ - Ensure dags from old dag_folder is marked as stale when dag processor - is running as part of scheduler. - """ - - def get_dag_string(filename) -> str: - return open(TEST_DAG_FOLDER / filename).read() - - with tempfile.TemporaryDirectory() as tmpdir: - old_dag_home = tempfile.mkdtemp(dir=tmpdir) - old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py") - old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode()) - old_dag_file.flush() - new_dag_home = tempfile.mkdtemp(dir=tmpdir) - new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py") - new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode()) - new_dag_file.flush() - - manager = DagProcessorJobRunner( - job=Job(), - processor=DagFileProcessorManager( - dag_directory=new_dag_home, - max_runs=1, - processor_timeout=timedelta(minutes=10), - signal_conn=MagicMock(), - dag_ids=[], - pickle_dags=False, - async_mode=True, - ), - ) - - dagbag = DagBag(old_dag_file.name, read_dags_from_db=False) - other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False) - - with create_session() as session: - # Add DAG from old dah home to the DB - dag = dagbag.get_dag("test_example_bash_operator") - dag.fileloc = old_dag_file.name - dag.last_parsed_time = timezone.utcnow() - dag.sync_to_db(processor_subdir=old_dag_home) - - # Add DAG from new DAG home to the DB - other_dag = other_dagbag.get_dag("test_start_date_scheduling") - other_dag.fileloc = new_dag_file.name - other_dag.last_parsed_time = timezone.utcnow() - other_dag.sync_to_db(processor_subdir=new_dag_home) - - manager.processor._file_paths = [new_dag_file] - - active_dag_count = ( - session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - ) - assert active_dag_count == 2 - - manager.processor._scan_stale_dags() - - active_dag_count = ( - session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - ) - assert active_dag_count == 1 - @mock.patch( "airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index c2d5ca1a587b24..372a6ae2d9dfe2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3581,7 +3581,6 @@ def test_retry_still_in_executor(self, dag_maker): dag_id="test_retry_still_in_executor", schedule="@once", session=session, - fileloc=os.devnull + "/test_retry_still_in_executor.py", ): dag_task1 = BashOperator( task_id="test_retry_handling_op",